[GitHub] [flink] pegasas commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


pegasas commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1284013387


##
flink-examples/flink-examples-batch/pom.xml:
##
@@ -48,25 +48,6 @@ under the License.



-   
-   
-   org.apache.maven.plugins
-   maven-compiler-plugin
-   
-   
-   compile
-   process-sources
-   
-   compile
-   
-   
-   
-Xlint:deprecation
-   
true
-   
-   
-   
-   
-   

Review Comment:
   Thanks @xintongsong , @WencongLiu ,
   I make a PR for tracking: https://github.com/apache/flink/pull/23135
   will request a review after going through it with myself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32741) Remove DataSet related descriptions in doc

2023-08-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32741:
---
Labels: 2.0-related pull-request-available  (was: 2.0-related)

> Remove DataSet related descriptions in doc
> --
>
> Key: FLINK-32741
> URL: https://issues.apache.org/jira/browse/FLINK-32741
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Documentation
>Affects Versions: 2.0.0
>Reporter: Wencong Liu
>Assignee: Junyao Huang
>Priority: Major
>  Labels: 2.0-related, pull-request-available
> Fix For: 1.18.0
>
>
> Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't 
> recommend developers to use the DataSet, the descriptions of DataSet should 
> be removed in the doc after [FLINK-32558].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] pegasas opened a new pull request, #23135: [FLINK-32741]Remove DataSet related descriptions in doc

2023-08-03 Thread via GitHub


pegasas opened a new pull request, #23135:
URL: https://github.com/apache/flink/pull/23135

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-29527) Make unknownFieldsIndices work for single ParquetReader

2023-08-03 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen resolved FLINK-29527.
---
Resolution: Fixed

master via 50622df1f01cd9cade78bfe2add5a7faff678e3e

Shall we pick to other version? It seems somehow a feature catch up or fix.

> Make unknownFieldsIndices work for single ParquetReader
> ---
>
> Key: FLINK-29527
> URL: https://issues.apache.org/jira/browse/FLINK-29527
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Sun Shun
>Assignee: Sun Shun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, from the improvement FLINK-23715, Flink use a collection named 
> `unknownFieldsIndices` to track the nonexistent fields, and it is kept inside 
> the `ParquetVectorizedInputFormat`, and applied to all parquet files under 
> given path.
> However, some fields may only be nonexistent in some of the historical 
> parquet files, while exist in latest ones. And based on 
> `unknownFieldsIndices`, flink will always skip these fields, even thought 
> they are existing in the later parquets.
> As a result, the value of these fields will become empty when they are 
> nonexistent in some historical parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29527) Make unknownFieldsIndices work for single ParquetReader

2023-08-03 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-29527:
--
Fix Version/s: 1.19.0

> Make unknownFieldsIndices work for single ParquetReader
> ---
>
> Key: FLINK-29527
> URL: https://issues.apache.org/jira/browse/FLINK-29527
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: Sun Shun
>Assignee: Sun Shun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> Currently, from the improvement FLINK-23715, Flink use a collection named 
> `unknownFieldsIndices` to track the nonexistent fields, and it is kept inside 
> the `ParquetVectorizedInputFormat`, and applied to all parquet files under 
> given path.
> However, some fields may only be nonexistent in some of the historical 
> parquet files, while exist in latest ones. And based on 
> `unknownFieldsIndices`, flink will always skip these fields, even thought 
> they are existing in the later parquets.
> As a result, the value of these fields will become empty when they are 
> nonexistent in some historical parquet files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle

2023-08-03 Thread via GitHub


reswqa commented on code in PR #23100:
URL: https://github.com/apache/flink/pull/23100#discussion_r1284006406


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {

Review Comment:
   To be honest, I don't like this way, but I can't think of a better way also. 
In theory, `FileSystem` should handle (or wrap) exceptions that can be retried 
by itself, but it doesn't help us do this.  



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {

Review Comment:
   To be honest, I don't like this way, but I can't think of a better way also. 
In theory, `FileSystem` should handle (or wrap) exceptions that can be retried 
by itself, but it doesn't help us do this.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28915) Make application mode could support remote DFS schema(e.g. S3, OSS, HDFS, etc.)

2023-08-03 Thread Junyao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750960#comment-17750960
 ] 

Junyao Huang commented on FLINK-28915:
--

Hi, [~hjw], [~wangyang0918],
What is the status of this thread?
Is there any plan to support ABFS 
https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html? 

> Make application mode could support remote DFS schema(e.g. S3, OSS, HDFS, 
> etc.)
> ---
>
> Key: FLINK-28915
> URL: https://issues.apache.org/jira/browse/FLINK-28915
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, flink-contrib
>Reporter: hjw
>Assignee: hjw
>Priority: Major
>  Labels: pull-request-available
>
> As the Flink document show , local is the only supported scheme in Native k8s 
> deployment.
> Is there have a plan to support s3 filesystem? thx.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] reswqa commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle

2023-08-03 Thread via GitHub


reswqa commented on code in PR #23100:
URL: https://github.com/apache/flink/pull/23100#discussion_r1284002092


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {
+currentRetryTime++;
+throwException(t, "Failed to check the status of segment file.");

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle

2023-08-03 Thread via GitHub


reswqa commented on code in PR #23100:
URL: https://github.com/apache/flink/pull/23100#discussion_r1284000935


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -109,7 +118,11 @@ private FileSystem createFileSystem() {
 
 /** Start the executor. */
 public void start() {
-scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS);
+synchronized (scannerExecutor) {

Review Comment:
   Ah, my mistake, fair enough.  



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -109,7 +118,11 @@ private FileSystem createFileSystem() {
 
 /** Start the executor. */
 public void start() {
-scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS);
+synchronized (scannerExecutor) {

Review Comment:
   Ah, my mistake, fair enough.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32735) Flink SQL Gateway for Native Kubernetes Application Mode

2023-08-03 Thread Junyao Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junyao Huang closed FLINK-32735.


Tracked by FLIP-316.

> Flink SQL Gateway for Native Kubernetes Application Mode 
> -
>
> Key: FLINK-32735
> URL: https://issues.apache.org/jira/browse/FLINK-32735
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Table SQL / Gateway
>Affects Versions: 1.19.0
>Reporter: Junyao Huang
>Priority: Major
>
> Hi, Flink Community,
> I have visited our doc on these pages:
>  # Flink Native Kubernetes: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/]
>  # Flink SQL Gateway: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/]
>  # Flink Kubernetes Operator Roadmap: 
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/roadmap/]
> Will we support Flink SQL on Native Kubernetes in our next version?
> Personally, I think it is an important feature, especially in this 
> cloud-native trend era. 
> Since we have support per job on YARN,
> I do not know if there is non-technical thinking for blocking. such as 
> commerce.
> Have we supported Flink SQL Gateway with native k8s now?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32735) Flink SQL Gateway for Native Kubernetes Application Mode

2023-08-03 Thread Junyao Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junyao Huang resolved FLINK-32735.
--
Release Note: tracked by FLIP-316.
  Resolution: Fixed

> Flink SQL Gateway for Native Kubernetes Application Mode 
> -
>
> Key: FLINK-32735
> URL: https://issues.apache.org/jira/browse/FLINK-32735
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Table SQL / Gateway
>Affects Versions: 1.19.0
>Reporter: Junyao Huang
>Priority: Major
>
> Hi, Flink Community,
> I have visited our doc on these pages:
>  # Flink Native Kubernetes: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/]
>  # Flink SQL Gateway: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/]
>  # Flink Kubernetes Operator Roadmap: 
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/roadmap/]
> Will we support Flink SQL on Native Kubernetes in our next version?
> Personally, I think it is an important feature, especially in this 
> cloud-native trend era. 
> Since we have support per job on YARN,
> I do not know if there is non-technical thinking for blocking. such as 
> commerce.
> Have we supported Flink SQL Gateway with native k8s now?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32735) Flink SQL Gateway for Native Kubernetes Application Mode

2023-08-03 Thread Junyao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750956#comment-17750956
 ] 

Junyao Huang commented on FLINK-32735:
--

Thanks [~lsy]! 
Your answer exactly fits my question!
Will follow FLIP-316 for further steps and close this issue.

> Flink SQL Gateway for Native Kubernetes Application Mode 
> -
>
> Key: FLINK-32735
> URL: https://issues.apache.org/jira/browse/FLINK-32735
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Table SQL / Gateway
>Affects Versions: 1.19.0
>Reporter: Junyao Huang
>Priority: Major
>
> Hi, Flink Community,
> I have visited our doc on these pages:
>  # Flink Native Kubernetes: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/]
>  # Flink SQL Gateway: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/]
>  # Flink Kubernetes Operator Roadmap: 
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/roadmap/]
> Will we support Flink SQL on Native Kubernetes in our next version?
> Personally, I think it is an important feature, especially in this 
> cloud-native trend era. 
> Since we have support per job on YARN,
> I do not know if there is non-technical thinking for blocking. such as 
> commerce.
> Have we supported Flink SQL Gateway with native k8s now?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob closed FLINK-32739.
-
Resolution: Not A Problem

> restarting not work
> ---
>
> Key: FLINK-32739
> URL: https://issues.apache.org/jira/browse/FLINK-32739
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
> Attachments: jobmanager.log
>
>
> In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
> following steps:
>  # submit streaming job which was configed to fixed restart strategy to flink 
> session environment
>  # this job was running on taskmanager1. Then I killed the taskmanager1.
>  # this job turned to be failed after restarting attemps.
> this job could not be transported to taskmanager2 which had enough slots as 
> expected.
> Here's the exception trace:
> {code:java}
> 2023-08-03 15:13:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
> backoffTimeMS=1)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
>     at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>     at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>     at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>     at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>     at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262)
>     at 
> 

[jira] [Created] (FLINK-32749) Sql gateway supports default catalog loaded by CatalogStore

2023-08-03 Thread Fang Yong (Jira)
Fang Yong created FLINK-32749:
-

 Summary: Sql gateway supports default catalog loaded by 
CatalogStore
 Key: FLINK-32749
 URL: https://issues.apache.org/jira/browse/FLINK-32749
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Gateway
Affects Versions: 1.19.0
Reporter: Fang Yong


Currently sql gateway will create memory catalog as default catalog, it should 
support default catalog loaded by catalog store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23100:
URL: https://github.com/apache/flink/pull/23100#discussion_r1283977914


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {
+currentRetryTime++;
+throwException(t, "Failed to check the status of segment file.");

Review Comment:
   `throwException ` can be modified to `tryThrowException`, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23100:
URL: https://github.com/apache/flink/pull/23100#discussion_r1283977912


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {
+currentRetryTime++;
+throwException(t, "Failed to check the status of segment file.");
 }
 return isExist;
 }
 
+private void throwException(Throwable t, String logMessage) {
+LOG.error(logMessage);

Review Comment:
   `LOG.error` has been modified to `LOG.warn`



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {
+currentRetryTime++;
+throwException(t, "Failed to check the status of segment file.");

Review Comment:
   `throwException ` can be modified to `tryThrowException`



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {

Review Comment:
   The type of exceptions is various when the code `remoteFileSystem.exists` is 
execute because the `remoteFileSystem` can be implemented by OSS, S3, etc. I 
think using `Throwable` is also acceptable because there is a `MAX_RETRY_TIME`. 
WDYT? 樂



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {

Review Comment:
   The type of exceptions is various when the code `remoteFileSystem.exists` is 
execute because the `remoteFileSystem` can be implemented by OSS, S3, etc. I 
think using `Throwable` is also acceptable because there is a `MAX_RETRY_TIME`. 
WDYT? 樂



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -207,20 +222,23 @@ private void scanMaxSegmentId(
 Path segmentFinishDir =
 getSegmentFinishDirPath(
 baseRemoteStoragePath, partitionId, 
subpartitionId.getSubpartitionId());
-FileStatus[] fileStatuses;
+FileStatus[] fileStatuses = new FileStatus[0];
 try {
 if (!remoteFileSystem.exists(segmentFinishDir)) {
 return;
 }
 fileStatuses = remoteFileSystem.listStatus(segmentFinishDir);
-} catch (IOException e) {
-throw new RuntimeException(
-"Failed to list the segment finish file. " + 
segmentFinishDir, e);
+currentRetryTime = 0;
+} catch (Throwable t) {
+if (t instanceof java.io.FileNotFoundException) {
+return;
+}
+currentRetryTime++;
+throwException(t, "Failed to list the segment finish file.");
 }
-if 

[jira] [Commented] (FLINK-32735) Flink SQL Gateway for Native Kubernetes Application Mode

2023-08-03 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750945#comment-17750945
 ] 

dalongliu commented on FLINK-32735:
---

[~pegasas] 

As stated in the documentation you listed, Native K8S is a session cluster, 
similar to yarn session clusters, and Flink SQL gateway supports native k8s. 
You can specify the context for the K8S clusters when you start the Gateway, 
and then the Gateway will be able to submit the jobs to the corresponding 
clusters.

Based on your description, I guess you may be asking if Flink SQL Gateway 
supports application mode on native k8s, which is currently not supported, but 
the community is already discussing this issue with FLIP-316, and we may be 
able to provide support for it in version 1.19. You can see FLIP-316 for more 
cdetail.

Finally, this is a question about Flink SQL gateway and K8S integration 
support, not to solve a specific implementation details of Flink, opening a 
Jira issue is not appropriate, you can directly in Flink's user mailing list to 
start an discussion, there will be experts in the community to answer your 
questions.

 

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver

> Flink SQL Gateway for Native Kubernetes Application Mode 
> -
>
> Key: FLINK-32735
> URL: https://issues.apache.org/jira/browse/FLINK-32735
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes, Table SQL / Gateway
>Affects Versions: 1.19.0
>Reporter: Junyao Huang
>Priority: Major
>
> Hi, Flink Community,
> I have visited our doc on these pages:
>  # Flink Native Kubernetes: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/]
>  # Flink SQL Gateway: 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/]
>  # Flink Kubernetes Operator Roadmap: 
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/roadmap/]
> Will we support Flink SQL on Native Kubernetes in our next version?
> Personally, I think it is an important feature, especially in this 
> cloud-native trend era. 
> Since we have support per job on YARN,
> I do not know if there is non-technical thinking for blocking. such as 
> commerce.
> Have we supported Flink SQL Gateway with native k8s now?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283910488


##
flink-examples/flink-examples-batch/pom.xml:
##
@@ -48,25 +48,6 @@ under the License.



-   
-   
-   org.apache.maven.plugins
-   maven-compiler-plugin
-   
-   
-   compile
-   process-sources
-   
-   compile
-   
-   
-   
-Xlint:deprecation
-   
true
-   
-   
-   
-   
-   

Review Comment:
   1. The classes in `flink-examples-batch` module is still utilized for 
testing purpose. I have added the warning log in all batch example classes to 
remind the developer that the DataSet API is deprecated. I also have added the 
note to `bin.xml` in `flink-disk` module to explain the batch examples in 
example package is only for tests. I have create an issue 
[FLINK-32642](https://issues.apache.org/jira/browse/FLINK-32742) to track the 
removal of `flink-examples-batch` module.
   
   2. I have created an issue 
[FLINK-32741](https://issues.apache.org/jira/browse/FLINK-32741) to track the 
removal of DataSet related documentations. Currently the issue is assigned to 
@pegasas .
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283959998


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -544,6 +544,13 @@ public ExecutionConfig setExecutionRetryDelay(long 
executionRetryDelay) {
  * The default execution mode is {@link ExecutionMode#PIPELINED}.
  *
  * @param executionMode The execution mode to use.
+ * @deprecated The {@link ExecutionMode} is deprecated because it's only 
used in DataSet APIs.
+ * All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
+ * Flink major version. You can still build your application in 
DataSet, but you should move
+ * to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
  */
 public void setExecutionMode(ExecutionMode executionMode) {

Review Comment:
   The annotation is added.



##
flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java:
##
@@ -77,12 +80,23 @@
  *   Broadcast variables in bulk iterations
  *   Custom Java objects (POJOs)
  * 
+ *
+ * Note: All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
+ * Flink major version. You can still build your application in DataSet, but 
you should move to
+ * either the DataStream and/or Table API. This class is retained for testing 
purposes.
  */
 @SuppressWarnings("serial")
 public class KMeans {
 
+private static final Logger LOGGER = LoggerFactory.getLogger(KMeans.class);
+
 public static void main(String[] args) throws Exception {
 
+LOGGER.warn(
+"All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a future"
++ " Flink major version. You can still build your 
application in DataSet, but you should move to"
++ " either the DataStream and/or Table API. This class 
is retained for testing purposes.");

Review Comment:
   I have added a static String instant DATASET_DEPRECATION_INFO to contain the 
message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283910488


##
flink-examples/flink-examples-batch/pom.xml:
##
@@ -48,25 +48,6 @@ under the License.



-   
-   
-   org.apache.maven.plugins
-   maven-compiler-plugin
-   
-   
-   compile
-   process-sources
-   
-   compile
-   
-   
-   
-Xlint:deprecation
-   
true
-   
-   
-   
-   
-   

Review Comment:
   1. The classes in `flink-examples-batch` module is still utilized for 
testing purpose. I have added the warning log in all batch example classes to 
remind the developer that the DataSet API is deprecated. I also have added the 
note to `bin.xml` in `flink-disk` module to explain the batch examples in 
example package is only for tests.
   
   2. I have created an issue 
[FLINK-32741](https://issues.apache.org/jira/browse/FLINK-32741) to track the 
removal of DataSet related documentations. Currently the issue is assigned to 
@pegasas .
   



##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -556,7 +556,14 @@ public void setExecutionMode(ExecutionMode executionMode) {
  * The default execution mode is {@link ExecutionMode#PIPELINED}.
  *
  * @return The execution mode for the program.
+ * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a
+ * future Flink major version. You can still build your application in 
DataSet, but you
+ * should move to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
  */
+@Deprecated

Review Comment:
   1. `ExecutionMode` and `setExecutionMode ` are both deprecated.
   2. The class JavaDoc of execution mode is updated.



##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -913,6 +920,18 @@ public LinkedHashSet> getRegisteredPojoTypes() {
 return registeredPojoTypes;
 }
 
+/**
+ * Get if the auto type registration is disabled.
+ *
+ * @return if the auto type registration is disabled.
+ * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a
+ * future Flink major version. You can still build your application in 
DataSet, but you
+ * should move to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
+ */
+@Deprecated

Review Comment:
   Both `disableAutoTypeRegistration ` and `AUTO_TYPE_REGISTRATION ` are 
deprecated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzifu666 closed pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread via GitHub


xuzifu666 closed pull request #23134: [FLINK-32748] 
WriteSinkFunction::cleanFile need close write automaticly
URL: https://github.com/apache/flink/pull/23134


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xuzifu666 commented on pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread via GitHub


xuzifu666 commented on PR #23134:
URL: https://github.com/apache/flink/pull/23134#issuecomment-1664944270

   would open a new pr to fix these 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32748) WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread xy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xy closed FLINK-32748.
--
Resolution: Incomplete

> WriteSinkFunction::cleanFile need close write automaticly
> -
>
> Key: FLINK-32748
> URL: https://issues.apache.org/jira/browse/FLINK-32748
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: xy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.4
>
>
> WriteSinkFunction::cleanFile need close write automaticly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283910551


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -913,6 +920,18 @@ public LinkedHashSet> getRegisteredPojoTypes() {
 return registeredPojoTypes;
 }
 
+/**
+ * Get if the auto type registration is disabled.
+ *
+ * @return if the auto type registration is disabled.
+ * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a
+ * future Flink major version. You can still build your application in 
DataSet, but you
+ * should move to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
+ */
+@Deprecated

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle

2023-08-03 Thread via GitHub


reswqa commented on code in PR #23100:
URL: https://github.com/apache/flink/pull/23100#discussion_r1283940833


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {
+currentRetryTime++;
+throwException(t, "Failed to check the status of segment file.");

Review Comment:
   `throwException` imply that this will always throw exception, but it does 
not.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {
+currentRetryTime++;
+throwException(t, "Failed to check the status of segment file.");
 }
 return isExist;
 }
 
+private void throwException(Throwable t, String logMessage) {
+LOG.error(logMessage);

Review Comment:
   This can easily confuse people. Some users monitor the system by scanning 
the logs for ERROR (might sending alerts). IMO, It can only be WARNING at most.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -109,7 +118,11 @@ private FileSystem createFileSystem() {
 
 /** Start the executor. */
 public void start() {
-scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS);
+synchronized (scannerExecutor) {

Review Comment:
   Do we really need synchronized lock? IIUC, a volatile variable is enough?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -236,15 +254,24 @@ private boolean checkSegmentExist(
 partitionId,
 subpartitionId.getSubpartitionId(),
 segmentId);
-boolean isExist;
+boolean isExist = false;
 try {
 isExist = remoteFileSystem.exists(segmentPath);
-} catch (IOException e) {
-throw new RuntimeException("Failed to check segment file. " + 
segmentPath, e);
+currentRetryTime = 0;
+} catch (Throwable t) {

Review Comment:
   I'm not sure if catch any throwable including un-recoverable error is safe 
enough.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -144,7 +157,9 @@ public void watchSegment(
 
 /** Close the executor. */
 public void close() {
-scannerExecutor.shutdownNow();
+synchronized (scannerExecutor) {
+scannerExecutor.shutdownNow();

Review Comment:
   I'd suggestion waiting for all pending task become finished when close this 
scanner.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java:
##
@@ -207,20 +222,23 @@ private void scanMaxSegmentId(
 Path segmentFinishDir =
 getSegmentFinishDirPath(
 baseRemoteStoragePath, partitionId, 
subpartitionId.getSubpartitionId());
-FileStatus[] fileStatuses;
+FileStatus[] fileStatuses = new FileStatus[0];
 try {
 if (!remoteFileSystem.exists(segmentFinishDir)) {
 return;
 }
 fileStatuses = remoteFileSystem.listStatus(segmentFinishDir);
-} catch (IOException e) {
-throw new RuntimeException(
-"Failed to list the segment finish file. " + 
segmentFinishDir, e);
+currentRetryTime = 0;
+} catch (Throwable t) {
+if (t instanceof java.io.FileNotFoundException) {
+return;
+}
+currentRetryTime++;
+throwException(t, "Failed to list the segment finish file.");
 }
-if 

[jira] [Updated] (FLINK-32746) Enable ZGC in JDK17 to solve long time class unloading STW

2023-08-03 Thread xiangyu feng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiangyu feng updated FLINK-32746:
-
Summary: Enable ZGC in JDK17 to solve long time class unloading STW  (was: 
Enable ZGC in JDK17 to solve long time class unloading STW during fullgc)

> Enable ZGC in JDK17 to solve long time class unloading STW
> --
>
> Key: FLINK-32746
> URL: https://issues.apache.org/jira/browse/FLINK-32746
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: xiangyu feng
>Priority: Major
>
> In a OLAP session cluster, a TM need to frequently create new classloaders 
> and  generate new classes. These classes will be accumulated in metaspace. 
> When metaspace data usage reaches a threshold, a FullGC with a long time 
> Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and 
> G1GC are doing Stop-the-World class unloading. Only ZGC supports concurrent 
> class unload, see more in 
> [https://bugs.openjdk.org/browse/JDK-8218905|https://bugs.openjdk.org/browse/JDK-8218905).].
>  
> In our scenario, a class unloading for a 2GB metaspace with 5million classes 
> will stop the application more than 40 seconds. After switch to ZGC, the 
> maximum STW of the application has been reduced to less than 10ms.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283910579


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -544,6 +544,13 @@ public ExecutionConfig setExecutionRetryDelay(long 
executionRetryDelay) {
  * The default execution mode is {@link ExecutionMode#PIPELINED}.
  *
  * @param executionMode The execution mode to use.
+ * @deprecated The {@link ExecutionMode} is deprecated because it's only 
used in DataSet APIs.
+ * All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
+ * Flink major version. You can still build your application in 
DataSet, but you should move
+ * to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
  */
 public void setExecutionMode(ExecutionMode executionMode) {

Review Comment:
   Fixed.



##
flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java:
##
@@ -77,12 +80,23 @@
  *   Broadcast variables in bulk iterations
  *   Custom Java objects (POJOs)
  * 
+ *
+ * Note: All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
+ * Flink major version. You can still build your application in DataSet, but 
you should move to
+ * either the DataStream and/or Table API. This class is retained for testing 
purposes.
  */
 @SuppressWarnings("serial")
 public class KMeans {
 
+private static final Logger LOGGER = LoggerFactory.getLogger(KMeans.class);
+
 public static void main(String[] args) throws Exception {
 
+LOGGER.warn(
+"All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a future"
++ " Flink major version. You can still build your 
application in DataSet, but you should move to"
++ " either the DataStream and/or Table API. This class 
is retained for testing purposes.");

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283910526


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -556,7 +556,14 @@ public void setExecutionMode(ExecutionMode executionMode) {
  * The default execution mode is {@link ExecutionMode#PIPELINED}.
  *
  * @return The execution mode for the program.
+ * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a
+ * future Flink major version. You can still build your application in 
DataSet, but you
+ * should move to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
  */
+@Deprecated

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283948678


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java:
##
@@ -116,58 +111,53 @@ public static TestingJobResultStore.Builder builder() {
 /** {@code Builder} for instantiating {@code TestingJobResultStore} 
instances. */
 public static class Builder {
 
-private ThrowingConsumer 
createDirtyResultConsumer =
-ignored -> {};
-private ThrowingConsumer 
markResultAsCleanConsumer =
-ignored -> {};
+private Function> 
createDirtyResultConsumer =
+jobResultEntry -> CompletableFuture.completedFuture(null);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Tartarus0zm commented on pull request #23116: [FLINK-32581][docs] Add docs for atomic CTAS and RTAS

2023-08-03 Thread via GitHub


Tartarus0zm commented on PR #23116:
URL: https://github.com/apache/flink/pull/23116#issuecomment-1664931262

   @luoyuxia  thanks for your review. I have updated the pr, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283947288


##
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java:
##
@@ -46,72 +47,97 @@ public interface JobResultStoreContractTest {
 @Test
 default void testStoreJobResultsWithDuplicateIDsThrowsException() throws 
IOException {
 JobResultStore jobResultStore = createJobResultStore();
-jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join();
 final JobResultEntry otherEntryWithDuplicateId =
 new JobResultEntry(
 TestingJobResultStore.createSuccessfulJobResult(
 DUMMY_JOB_RESULT_ENTRY.getJobId()));
-assertThatThrownBy(() -> 
jobResultStore.createDirtyResult(otherEntryWithDuplicateId))
-.isInstanceOf(IllegalStateException.class);
+assertThatThrownBy(
+() ->
+jobResultStore
+
.createDirtyResultAsync(otherEntryWithDuplicateId)
+.join())
+.isInstanceOf(CompletionException.class);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283946578


##
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java:
##
@@ -116,46 +123,74 @@ public void 
testBaseDirectoryCreationOnResultStoreInitialization() throws Except
 assertThat(emptyBaseDirectory).doesNotExist();
 
 fileSystemJobResultStore =
-new FileSystemJobResultStore(basePath.getFileSystem(), 
basePath, false);
+new FileSystemJobResultStore(
+basePath.getFileSystem(), basePath, false, 
manuallyTriggeredExecutor);
 // Result store operations are creating the base directory on-the-fly
 assertThat(emptyBaseDirectory).doesNotExist();
-fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
+CompletableFuture dirtyResultAsync =
+
fileSystemJobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY);
+manuallyTriggeredExecutor.trigger();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283945744


##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##
@@ -579,10 +587,15 @@ public void 
testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws
 final CompletableFuture dirtyJobFuture = new 
CompletableFuture<>();
 final JobResultStore jobResultStore =
 TestingJobResultStore.builder()
-
.withCreateDirtyResultConsumer(dirtyJobFuture::complete)
+.withCreateDirtyResultConsumer(
+jobResultEntry -> {
+dirtyJobFuture.complete(jobResultEntry);
+return FutureUtils.completedVoidFuture();
+})
 .withMarkResultAsCleanConsumer(
 jobId -> {
-throw new IOException("Expected 
IOException.");
+return FutureUtils.completedExceptionally(

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32739) restarting not work

2023-08-03 Thread Junrui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750942#comment-17750942
 ] 

Junrui Li commented on FLINK-32739:
---

[~SpongebobZ] It will just restart tasks in the original jobs. And more details 
about failover-strategy can see 
[here|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#restart-pipelined-region-failover-strategy].
 

> restarting not work
> ---
>
> Key: FLINK-32739
> URL: https://issues.apache.org/jira/browse/FLINK-32739
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
> Attachments: jobmanager.log
>
>
> In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
> following steps:
>  # submit streaming job which was configed to fixed restart strategy to flink 
> session environment
>  # this job was running on taskmanager1. Then I killed the taskmanager1.
>  # this job turned to be failed after restarting attemps.
> this job could not be transported to taskmanager2 which had enough slots as 
> expected.
> Here's the exception trace:
> {code:java}
> 2023-08-03 15:13:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
> backoffTimeMS=1)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
>     at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>     at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>     at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>     at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>     at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> 

[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283944785


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##
@@ -277,18 +277,11 @@ private void startJobMasterServiceProcessAsync(UUID 
leaderSessionId) {
 
 @GuardedBy("lock")
 private void 
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
-throws FlinkException {
-try {
-if (jobResultStore.hasJobResultEntry(getJobID())) {
-jobAlreadyDone(leaderSessionId);
-} else {
-createNewJobMasterServiceProcess(leaderSessionId);
-}
-} catch (IOException e) {
-throw new FlinkException(
-String.format(
-"Could not retrieve the job scheduling status for 
job %s.", getJobID()),
-e);
+throws FlinkException, ExecutionException, InterruptedException {
+if (jobResultStore.hasJobResultEntryAsync(getJobID()).get()) {

Review Comment:
   Yes, I agree that make both `startJobMasterServiceProcessAsync` and 
`verifyJobSchedulingStatusAndCreateJobMasterServiceProcess` leverage the async 
nature is important. 
   Currently dealing with the sync and async future logic in these methods will 
be complex and make the code hard to understand, so refactoring this needs to 
be well designed and may be completed in another pr, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283937477


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##


Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283935357


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##
@@ -57,17 +61,17 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
 }
 
 @Override
-public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+public Boolean hasDirtyJobResultEntryInternal(JobID jobId) {
 return dirtyJobResults.containsKey(jobId);
 }
 
 @Override
-public boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
+public Boolean hasCleanJobResultEntryInternal(JobID jobId) {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283935250


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java:
##
@@ -57,17 +61,17 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
 }
 
 @Override
-public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+public Boolean hasDirtyJobResultEntryInternal(JobID jobId) {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283934746


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##
@@ -43,69 +44,70 @@ public interface JobResultStore {
  * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
  * clean-up operations still need to be performed. Once the job resource 
cleanup has been
  * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
  *
  * @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO 
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty 
result is created
+ * successfully. The method will throw {@link IllegalStateException} 
if the passed {@code
+ * jobResultEntry} has a {@code JobID} attached that is already 
registered in this {@code
+ * JobResultStore}.
  */
-void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+CompletableFuture createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
 /**
  * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
  * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
  * {@code JobID} belongs to a job that was already marked as clean.
  *
  * @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked 
successfully, The future will
+ * completed with {@link NoSuchElementException} if there is no 
corresponding {@code dirty}
+ * job present in the store for the given {@code JobID}.

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32667) Use standalone store and embedding writer for jobs with no-restart-strategy in session cluster

2023-08-03 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750938#comment-17750938
 ] 

Fang Yong commented on FLINK-32667:
---

[~mapohl] We have an internally e2e test for olap queries to statistical 
latency and QPS. However, e2e testing requires some resources. I'm considering 
how to build this test in the community, and whether each important issue can 
be tested in flink-benchmarks project. What do you think of it? Thanks



> Use standalone store and embedding writer for jobs with no-restart-strategy 
> in session cluster
> --
>
> Key: FLINK-32667
> URL: https://issues.apache.org/jira/browse/FLINK-32667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> When a flink session cluster use zk or k8s high availability service, it will 
> store jobs in zk or ConfigMap. When we submit flink olap jobs to the session 
> cluster, they always turn off restart strategy. These jobs with 
> no-restart-strategy should not be stored in zk or ConfigMap in k8s



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32739) restarting not work

2023-08-03 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750937#comment-17750937
 ] 

Spongebob commented on FLINK-32739:
---

Thanks [~JunRuiLi] , this option helps. When one of regions restart, it will be 
restarted in the original job or new job ?

> restarting not work
> ---
>
> Key: FLINK-32739
> URL: https://issues.apache.org/jira/browse/FLINK-32739
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
> Attachments: jobmanager.log
>
>
> In my flink standalone cluster, I config 2 taskmanagers. Then I test with 
> following steps:
>  # submit streaming job which was configed to fixed restart strategy to flink 
> session environment
>  # this job was running on taskmanager1. Then I killed the taskmanager1.
>  # this job turned to be failed after restarting attemps.
> this job could not be transported to taskmanager2 which had enough slots as 
> expected.
> Here's the exception trace:
> {code:java}
> 2023-08-03 15:13:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
> backoffTimeMS=1)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
>     at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>     at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073)
>     at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>     at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>     at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>     at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>     at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>     at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275)
>     at 
> org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267)
>     at 
> 

[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283928788


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##
@@ -43,69 +44,70 @@ public interface JobResultStore {
  * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
  * clean-up operations still need to be performed. Once the job resource 
cleanup has been
  * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
  *
  * @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO 
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty 
result is created
+ * successfully. The method will throw {@link IllegalStateException} 
if the passed {@code
+ * jobResultEntry} has a {@code JobID} attached that is already 
registered in this {@code
+ * JobResultStore}.
  */
-void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+CompletableFuture createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
 /**
  * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
  * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
  * {@code JobID} belongs to a job that was already marked as clean.
  *
  * @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked 
successfully, The future will
+ * completed with {@link NoSuchElementException} if there is no 
corresponding {@code dirty}
+ * job present in the store for the given {@code JobID}.
  */
-void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException;
+CompletableFuture markResultAsCleanAsync(JobID jobId);
 
 /**
- * Returns whether the store already contains an entry for a job.
+ * Returns the future of whether the store already contains an entry for a 
job.
  *
  * @param jobId Ident of the job we wish to check the store for.
- * @return {@code true} if a {@code dirty} or {@code clean} {@link 
JobResultEntry} exists for
- * the given {@code JobID}; otherwise {@code false}.
- * @throws IOException if determining whether a job entry is present in 
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true} if a {@code 
dirty} or {@code clean}
+ * {@link JobResultEntry} exists for the given {@code JobID}; 
otherwise a successfully
+ * completed future with {@code false}.
  */
-default boolean hasJobResultEntry(JobID jobId) throws IOException {
-return hasDirtyJobResultEntry(jobId) || hasCleanJobResultEntry(jobId);
+default CompletableFuture hasJobResultEntryAsync(JobID jobId) {
+return hasDirtyJobResultEntryAsync(jobId)
+.thenCombine(
+hasCleanJobResultEntryAsync(jobId),
+(result1, result2) -> result1 || result2);
 }
 
 /**
- * Returns whether the store already contains a {@code dirty} entry for 
the given {@code JobID}.
+ * Returns the future of whether the store contains a {@code dirty} entry 
for the given {@code
+ * JobID}.
  *
  * @param jobId Ident of the job we wish to check the store for.
- * @return {@code true}, if a {@code dirty} entry exists for the given 
{@code JobID}; otherwise
- * {@code false}.
- * @throws IOException if determining whether a job entry is present in 
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true}, if a {@code 
dirty} entry exists
+ * for the given {@code JobID}; otherwise a successfully completed 
future with {@code

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283927882


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##
@@ -43,69 +44,70 @@ public interface JobResultStore {
  * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
  * clean-up operations still need to be performed. Once the job resource 
cleanup has been
  * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
  *
  * @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO 
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty 
result is created
+ * successfully. The method will throw {@link IllegalStateException} 
if the passed {@code
+ * jobResultEntry} has a {@code JobID} attached that is already 
registered in this {@code
+ * JobResultStore}.
  */
-void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+CompletableFuture createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
 /**
  * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
  * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
  * {@code JobID} belongs to a job that was already marked as clean.
  *
  * @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked 
successfully, The future will
+ * completed with {@link NoSuchElementException} if there is no 
corresponding {@code dirty}
+ * job present in the store for the given {@code JobID}.
  */
-void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException;
+CompletableFuture markResultAsCleanAsync(JobID jobId);
 
 /**
- * Returns whether the store already contains an entry for a job.
+ * Returns the future of whether the store already contains an entry for a 
job.
  *
  * @param jobId Ident of the job we wish to check the store for.
- * @return {@code true} if a {@code dirty} or {@code clean} {@link 
JobResultEntry} exists for
- * the given {@code JobID}; otherwise {@code false}.
- * @throws IOException if determining whether a job entry is present in 
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true} if a {@code 
dirty} or {@code clean}
+ * {@link JobResultEntry} exists for the given {@code JobID}; 
otherwise a successfully
+ * completed future with {@code false}.

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283926936


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##
@@ -43,69 +44,70 @@ public interface JobResultStore {
  * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
  * clean-up operations still need to be performed. Once the job resource 
cleanup has been
  * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
  *
  * @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO 
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty 
result is created
+ * successfully. The method will throw {@link IllegalStateException} 
if the passed {@code

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283926122


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java:
##
@@ -43,69 +44,70 @@ public interface JobResultStore {
  * Registers the passed {@link JobResultEntry} instance as {@code dirty} 
which indicates that
  * clean-up operations still need to be performed. Once the job resource 
cleanup has been
  * finalized, we can mark the {@code JobResultEntry} as {@code clean} 
result using {@link
- * #markResultAsClean(JobID)}.
+ * #markResultAsCleanAsync(JobID)}.
  *
  * @param jobResultEntry The job result we wish to persist.
- * @throws IOException if the creation of the dirty result failed for IO 
reasons.
- * @throws IllegalStateException if the passed {@code jobResultEntry} has 
a {@code JobID}
- * attached that is already registered in this {@code JobResultStore}.
+ * @return a successfully completed future with {@code true} if the dirty 
result is created
+ * successfully. The method will throw {@link IllegalStateException} 
if the passed {@code
+ * jobResultEntry} has a {@code JobID} attached that is already 
registered in this {@code
+ * JobResultStore}.
  */
-void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, 
IllegalStateException;
+CompletableFuture createDirtyResultAsync(JobResultEntry 
jobResultEntry);
 
 /**
  * Marks an existing {@link JobResultEntry} as {@code clean}. This 
indicates that no more
  * resource cleanup steps need to be performed. No actions should be 
triggered if the passed
  * {@code JobID} belongs to a job that was already marked as clean.
  *
  * @param jobId Ident of the job we wish to mark as clean.
- * @throws IOException if marking the {@code dirty} {@code JobResultEntry} 
as {@code clean}
- * failed for IO reasons.
- * @throws NoSuchElementException if there is no corresponding {@code 
dirty} job present in the
- * store for the given {@code JobID}.
+ * @return a successfully completed future if the result is marked 
successfully, The future will
+ * completed with {@link NoSuchElementException} if there is no 
corresponding {@code dirty}
+ * job present in the store for the given {@code JobID}.
  */
-void markResultAsClean(JobID jobId) throws IOException, 
NoSuchElementException;
+CompletableFuture markResultAsCleanAsync(JobID jobId);
 
 /**
- * Returns whether the store already contains an entry for a job.
+ * Returns the future of whether the store already contains an entry for a 
job.
  *
  * @param jobId Ident of the job we wish to check the store for.
- * @return {@code true} if a {@code dirty} or {@code clean} {@link 
JobResultEntry} exists for
- * the given {@code JobID}; otherwise {@code false}.
- * @throws IOException if determining whether a job entry is present in 
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true} if a {@code 
dirty} or {@code clean}
+ * {@link JobResultEntry} exists for the given {@code JobID}; 
otherwise a successfully
+ * completed future with {@code false}.
  */
-default boolean hasJobResultEntry(JobID jobId) throws IOException {
-return hasDirtyJobResultEntry(jobId) || hasCleanJobResultEntry(jobId);
+default CompletableFuture hasJobResultEntryAsync(JobID jobId) {
+return hasDirtyJobResultEntryAsync(jobId)
+.thenCombine(
+hasCleanJobResultEntryAsync(jobId),
+(result1, result2) -> result1 || result2);
 }
 
 /**
- * Returns whether the store already contains a {@code dirty} entry for 
the given {@code JobID}.
+ * Returns the future of whether the store contains a {@code dirty} entry 
for the given {@code
+ * JobID}.
  *
  * @param jobId Ident of the job we wish to check the store for.
- * @return {@code true}, if a {@code dirty} entry exists for the given 
{@code JobID}; otherwise
- * {@code false}.
- * @throws IOException if determining whether a job entry is present in 
the store failed for IO
- * reasons.
+ * @return a successfully completed future with {@code true}, if a {@code 
dirty} entry exists
+ * for the given {@code JobID}; otherwise a successfully completed 
future with {@code
+ * false}.
  */
-boolean hasDirtyJobResultEntry(JobID jobId) throws IOException;
+CompletableFuture hasDirtyJobResultEntryAsync(JobID jobId);
 
 /**
- * Returns whether the store already contains a {@code clean} entry for 
the given {@code JobID}.
+ * Returns the future of whether the store contains a {@code clean} entry 
for the given {@code
+ * JobID}.
  *
  * @param jobId Ident of the job we 

[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283925581


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
 }
 
 @Override
-public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
 return fileSystem.exists(constructDirtyPath(jobId));
 }
 
 @Override
-public boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
+public Boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
 return fileSystem.exists(constructCleanPath(jobId));
 }
 
 @Override
 public Set getDirtyResultsInternal() throws IOException {
 createBasePathIfNeeded();
-
 final FileStatus[] statuses = fileSystem.listStatus(this.basePath);
-

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32738) PROTOBUF format supports projection push down

2023-08-03 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750934#comment-17750934
 ] 

dalongliu commented on FLINK-32738:
---

[~zhoujira86] Looks good to me if we can support this feature. Kindly remind 
you should also consider the case of nested fields pushdown.

> PROTOBUF format supports projection push down
> -
>
> Key: FLINK-32738
> URL: https://issues.apache.org/jira/browse/FLINK-32738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
> Fix For: 1.19.0
>
>
> support projection push down for protobuf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283925443


##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
 }
 
 @Override
-public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {

Review Comment:
   Fixed.



##
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java:
##
@@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws 
IOException, NoSuchEle
 }
 
 @Override
-public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
+public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws 
IOException {
 return fileSystem.exists(constructDirtyPath(jobId));
 }
 
 @Override
-public boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {
+public Boolean hasCleanJobResultEntryInternal(JobID jobId) throws 
IOException {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32746) Enable ZGC in JDK17 to solve long time class unloading STW during fullgc

2023-08-03 Thread xiangyu feng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiangyu feng updated FLINK-32746:
-
Description: 
In a OLAP session cluster, a TM need to frequently create new classloaders and  
generate new classes. These classes will be accumulated in metaspace. When 
metaspace data usage reaches a threshold, a FullGC with a long time 
Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and G1GC 
are doing Stop-the-World class unloading. Only ZGC supports concurrent class 
unload, see more in 
[https://bugs.openjdk.org/browse/JDK-8218905|https://bugs.openjdk.org/browse/JDK-8218905).].

 

In our scenario, a class unloading for a 2GB metaspace with 5million classes 
will stop the application more than 40 seconds. After switch to ZGC, the 
maximum STW of the application has been reduced to less than 10ms.
 

  was:
In a OLAP session cluster, a TM need to frequently create new classloaders and  
generate new classes. These classes will be accumulated in metaspace. When 
metaspace data usage reaches a threshold, a FullGC with a long time 
Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and G1GC 
are doing Stop-the-World class unloading. Only ZGC support concurrent class 
unload 
 


> Enable ZGC in JDK17 to solve long time class unloading STW during fullgc
> 
>
> Key: FLINK-32746
> URL: https://issues.apache.org/jira/browse/FLINK-32746
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: xiangyu feng
>Priority: Major
>
> In a OLAP session cluster, a TM need to frequently create new classloaders 
> and  generate new classes. These classes will be accumulated in metaspace. 
> When metaspace data usage reaches a threshold, a FullGC with a long time 
> Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and 
> G1GC are doing Stop-the-World class unloading. Only ZGC supports concurrent 
> class unload, see more in 
> [https://bugs.openjdk.org/browse/JDK-8218905|https://bugs.openjdk.org/browse/JDK-8218905).].
>  
> In our scenario, a class unloading for a 2GB metaspace with 5million classes 
> will stop the application more than 40 seconds. After switch to ZGC, the 
> maximum STW of the application has been reduced to less than 10ms.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283100724


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -1390,29 +1392,71 @@ private CompletableFuture 
registerGloballyTerminatedJobInJobRes
 "Job %s is in state %s which is not globally terminal.",
 jobId,
 terminalJobStatus);
-
-ioExecutor.execute(
-() -> {
-try {
-if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-log.warn(
-"Job {} is already marked as clean but 
clean up was triggered again.",
-jobId);
-} else if 
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-jobResultStore.createDirtyResult(
-new JobResultEntry(
-
JobResult.createFrom(archivedExecutionGraph)));
-log.info(
-"Job {} has been registered for cleanup in 
the JobResultStore after reaching a terminal state.",
-jobId);
-}
-} catch (IOException e) {
-writeFuture.completeExceptionally(e);
+CompletableFuture shouldCheckDirtyJobResult =
+jobResultStore
+.hasCleanJobResultEntryAsync(jobId)
+.handleAsync(
+(hasCleanJobResultEntry, throwable) -> {
+if (throwable != null) {
+
writeFuture.completeExceptionally(throwable);
+return false;
+} else {
+if (hasCleanJobResultEntry) {
+log.warn(
+"Job {} is already marked 
as clean but "
++ "clean up was 
triggered again.",
+jobId);
+writeFuture.complete(null);
+return false;
+} else {
+return true;
+}
+}
+});
+shouldCheckDirtyJobResult.whenCompleteAsync(
+(shouldCheck, throwable1) -> {
+if (throwable1 != null) {
+writeFuture.completeExceptionally(throwable1);
 return;
 }
-writeFuture.complete(null);
+if (shouldCheck) {
+jobResultStore
+.hasDirtyJobResultEntryAsync(jobId)
+.whenCompleteAsync(
+(hasDirtyJobResultEntry, throwable2) 
-> {
+if (throwable2 != null) {
+
writeFuture.completeExceptionally(throwable2);
+return;
+}
+if (!hasDirtyJobResultEntry) {
+jobResultStore
+
.createDirtyResultAsync(
+new 
JobResultEntry(
+
JobResult.createFrom(
+   
 archivedExecutionGraph)))
+.whenCompleteAsync(
+(unused, 
throwable3) -> {
+if 
(throwable3 != null) {
+
writeFuture
+   
 .completeExceptionally(
+   
 throwable3);
+return;
+}
+log.info(
+
"Job {} has been registered 

[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1283923614


##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java:
##
@@ -332,11 +332,16 @@ public void testJobBeingMarkedAsDirtyBeforeCleanup() 
throws Exception {
 TestingJobResultStore.builder()
 .withCreateDirtyResultConsumer(
 ignoredJobResultEntry -> {
+CompletableFuture 
result =
+new 
CompletableFuture<>();
 try {
 
markAsDirtyLatch.await();
 } catch 
(InterruptedException e) {
-throw new 
RuntimeException(e);
+
result.completeExceptionally(
+new 
RuntimeException(e));

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread via GitHub


flinkbot commented on PR #23134:
URL: https://github.com/apache/flink/pull/23134#issuecomment-1664900125

   
   ## CI report:
   
   * 30978f125a7b9d816e5875a4ce28f8b359341115 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23133: [FLINK-32747][table] Suport ddl for catalog which is loaded from CatalogStore

2023-08-03 Thread via GitHub


flinkbot commented on PR #23133:
URL: https://github.com/apache/flink/pull/23133#issuecomment-1664899703

   
   ## CI report:
   
   * 651315783f96f5c01dabba6702dfaf01190f5942 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32748) WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread xy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xy updated FLINK-32748:
---
Component/s: API / Core

> WriteSinkFunction::cleanFile need close write automaticly
> -
>
> Key: FLINK-32748
> URL: https://issues.apache.org/jira/browse/FLINK-32748
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Reporter: xy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.4
>
>
> WriteSinkFunction::cleanFile need close write automaticly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32747) Support ddl for catalog from CatalogStore

2023-08-03 Thread Fang Yong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong reassigned FLINK-32747:
-

Assignee: Fang Yong

> Support ddl for catalog from CatalogStore
> -
>
> Key: FLINK-32747
> URL: https://issues.apache.org/jira/browse/FLINK-32747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> Support ddl for catalog which loaded by CatalogStore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xuzifu666 commented on pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread via GitHub


xuzifu666 commented on PR #23134:
URL: https://github.com/apache/flink/pull/23134#issuecomment-1664896044

   cc @reswqa have a review plz


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32748) WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32748:
---
Labels: pull-request-available  (was: )

> WriteSinkFunction::cleanFile need close write automaticly
> -
>
> Key: FLINK-32748
> URL: https://issues.apache.org/jira/browse/FLINK-32748
> Project: Flink
>  Issue Type: Bug
>Reporter: xy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.4
>
>
> WriteSinkFunction::cleanFile need close write automaticly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32746) Enable ZGC in JDK17 to solve long time class unloading STW during fullgc

2023-08-03 Thread xiangyu feng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiangyu feng updated FLINK-32746:
-
Description: 
In a OLAP session cluster, a TM need to frequently create new classloaders and  
generate new classes. These classes will be accumulated in metaspace. When 
metaspace data usage reaches a threshold, a FullGC with a long time 
Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and G1GC 
are doing Stop-the-World class unloading. Only ZGC support concurrent class 
unload 
 

> Enable ZGC in JDK17 to solve long time class unloading STW during fullgc
> 
>
> Key: FLINK-32746
> URL: https://issues.apache.org/jira/browse/FLINK-32746
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: xiangyu feng
>Priority: Major
>
> In a OLAP session cluster, a TM need to frequently create new classloaders 
> and  generate new classes. These classes will be accumulated in metaspace. 
> When metaspace data usage reaches a threshold, a FullGC with a long time 
> Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and 
> G1GC are doing Stop-the-World class unloading. Only ZGC support concurrent 
> class unload 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xuzifu666 opened a new pull request, #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread via GitHub


xuzifu666 opened a new pull request, #23134:
URL: https://github.com/apache/flink/pull/23134

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   the pr is to improve file object manage
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32747) Support ddl for catalog from CatalogStore

2023-08-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32747:
---
Labels: pull-request-available  (was: )

> Support ddl for catalog from CatalogStore
> -
>
> Key: FLINK-32747
> URL: https://issues.apache.org/jira/browse/FLINK-32747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> Support ddl for catalog which loaded by CatalogStore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs opened a new pull request, #23133: [FLINK-32747][table] Suport ddl for catalog which is loaded from CatalogStore

2023-08-03 Thread via GitHub


FangYongs opened a new pull request, #23133:
URL: https://github.com/apache/flink/pull/23133

   ## What is the purpose of the change
   
   Currently `CatalogManager` can load catalog from `CatalogStore`, but we 
cannot perform ddl for the loaded catalog. This pr aims to support these 
operations.
   
   ## Brief change log
 - Add catalog loaded from `CatalogStore` to `catalogs` in `CatalogManager`
 - Get catalog from method `getCatalog` instead of `catalogs.get` for 
operations such as ddl and list tables/views
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added test case `CatalogManagerTest.testExistingCatalogInStore`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32748) WriteSinkFunction::cleanFile need close write automaticly

2023-08-03 Thread xy (Jira)
xy created FLINK-32748:
--

 Summary: WriteSinkFunction::cleanFile need close write automaticly
 Key: FLINK-32748
 URL: https://issues.apache.org/jira/browse/FLINK-32748
 Project: Flink
  Issue Type: Bug
Reporter: xy
 Fix For: 1.9.4


WriteSinkFunction::cleanFile need close write automaticly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32747) Support ddl for catalog from CatalogStore

2023-08-03 Thread Fang Yong (Jira)
Fang Yong created FLINK-32747:
-

 Summary: Support ddl for catalog from CatalogStore
 Key: FLINK-32747
 URL: https://issues.apache.org/jira/browse/FLINK-32747
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.19.0
Reporter: Fang Yong


Support ddl for catalog which loaded by CatalogStore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32741) Remove DataSet related descriptions in doc

2023-08-03 Thread Junyao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750930#comment-17750930
 ] 

Junyao Huang edited comment on FLINK-32741 at 8/4/23 2:33 AM:
--

Thanks! will start immediately. will link PR to this issue.


was (Author: pegasas):
Thanks! will start immediately.

> Remove DataSet related descriptions in doc
> --
>
> Key: FLINK-32741
> URL: https://issues.apache.org/jira/browse/FLINK-32741
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Documentation
>Affects Versions: 2.0.0
>Reporter: Wencong Liu
>Assignee: Junyao Huang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 1.18.0
>
>
> Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't 
> recommend developers to use the DataSet, the descriptions of DataSet should 
> be removed in the doc after [FLINK-32558].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32741) Remove DataSet related descriptions in doc

2023-08-03 Thread Junyao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750930#comment-17750930
 ] 

Junyao Huang commented on FLINK-32741:
--

Thanks! will start immediately.

> Remove DataSet related descriptions in doc
> --
>
> Key: FLINK-32741
> URL: https://issues.apache.org/jira/browse/FLINK-32741
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Documentation
>Affects Versions: 2.0.0
>Reporter: Wencong Liu
>Assignee: Junyao Huang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 1.18.0
>
>
> Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't 
> recommend developers to use the DataSet, the descriptions of DataSet should 
> be removed in the doc after [FLINK-32558].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs commented on pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on PR #23063:
URL: https://github.com/apache/flink/pull/23063#issuecomment-1664886934

   Thanks @ferenc-csaky , left some comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on PR #23026:
URL: https://github.com/apache/flink/pull/23026#issuecomment-1664886800

   > I think being used in test cases probably should not block the deprecation 
of the classes. In this specific case, I don't it's necessary to move them to 
another module. There are many similar classes that parses the application 
arguments into a `GlobalJobParameters`. I suspect these non-DataSet test cases 
using `ParameterTool` simply because they are imitated from DataSet test cases.
   
   Deprecating `ParameterTool` related classes and make other classes that 
parse the arguments in similar way with `ParameterTool` is a reasonable method. 
 I'll follow the suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283910595


##
flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java:
##
@@ -77,12 +80,23 @@
  *   Broadcast variables in bulk iterations
  *   Custom Java objects (POJOs)
  * 
+ *
+ * Note: All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
+ * Flink major version. You can still build your application in DataSet, but 
you should move to
+ * either the DataStream and/or Table API. This class is retained for testing 
purposes.
  */
 @SuppressWarnings("serial")
 public class KMeans {
 
+private static final Logger LOGGER = LoggerFactory.getLogger(KMeans.class);
+
 public static void main(String[] args) throws Exception {
 
+LOGGER.warn(
+"All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a future"
++ " Flink major version. You can still build your 
application in DataSet, but you should move to"
++ " either the DataStream and/or Table API. This class 
is retained for testing purposes.");

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


WencongLiu commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283910488


##
flink-examples/flink-examples-batch/pom.xml:
##
@@ -48,25 +48,6 @@ under the License.



-   
-   
-   org.apache.maven.plugins
-   maven-compiler-plugin
-   
-   
-   compile
-   process-sources
-   
-   compile
-   
-   
-   
-Xlint:deprecation
-   
true
-   
-   
-   
-   
-   

Review Comment:
   Fixed.



##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -556,7 +556,14 @@ public void setExecutionMode(ExecutionMode executionMode) {
  * The default execution mode is {@link ExecutionMode#PIPELINED}.
  *
  * @return The execution mode for the program.
+ * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a
+ * future Flink major version. You can still build your application in 
DataSet, but you
+ * should move to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
  */
+@Deprecated

Review Comment:
   Fixed.



##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -913,6 +920,18 @@ public LinkedHashSet> getRegisteredPojoTypes() {
 return registeredPojoTypes;
 }
 
+/**
+ * Get if the auto type registration is disabled.
+ *
+ * @return if the auto type registration is disabled.
+ * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a
+ * future Flink major version. You can still build your application in 
DataSet, but you
+ * should move to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
+ */
+@Deprecated

Review Comment:
   Fixed.



##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -544,6 +544,13 @@ public ExecutionConfig setExecutionRetryDelay(long 
executionRetryDelay) {
  * The default execution mode is {@link ExecutionMode#PIPELINED}.
  *
  * @param executionMode The execution mode to use.
+ * @deprecated The {@link ExecutionMode} is deprecated because it's only 
used in DataSet APIs.
+ * All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
+ * Flink major version. You can still build your application in 
DataSet, but you should move
+ * to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
  */
 public void setExecutionMode(ExecutionMode executionMode) {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283909486


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java:
##
@@ -20,42 +20,39 @@
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.factories.CatalogStoreFactory;
-import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.FactoryHelper;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.CHARSET;
 import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.IDENTIFIER;
 import static 
org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.PATH;
 import static 
org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
 
-/** CatalogStore factory for {@link FileCatalogStore}. */
+/** Catalog store factory for {@link FileCatalogStore}. */
 public class FileCatalogStoreFactory implements CatalogStoreFactory {
 
 private String path;
 
-private String charset;
-
 @Override
 public CatalogStore createCatalogStore() {
-return new FileCatalogStore(path, charset);
+return new FileCatalogStore(path);
 }
 
 @Override
-public void open(Context context) throws CatalogException {
-FactoryUtil.FactoryHelper factoryHelper = 
createCatalogStoreFactoryHelper(this, context);
+public void open(Context context) {
+FactoryHelper factoryHelper =
+createCatalogStoreFactoryHelper(this, context);
 factoryHelper.validate();
-ReadableConfig options = factoryHelper.getOptions();
 
+ReadableConfig options = factoryHelper.getOptions();
 path = options.get(PATH);
-charset = options.get(CHARSET);
 }
 
 @Override
-public void close() throws CatalogException {}
+public void close() {}

Review Comment:
   Call super.close()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283908619


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -217,32 +253,10 @@ public Set listCatalogs() throws CatalogException 
{
 public boolean contains(String catalogName) throws CatalogException {
 checkOpenState();
 
-return listAllCatalogFiles().containsKey(catalogName);
-}
-
-private Map listAllCatalogFiles() throws CatalogException {
-Map files = new HashMap<>();
-File directoryFile = new File(catalogStoreDirectory);
-if (!directoryFile.isDirectory()) {
-throw new CatalogException("File catalog store only support local 
directory");
-}
-
-try {
-Files.list(directoryFile.toPath())
-.filter(file -> 
file.getFileName().toString().endsWith(FILE_EXTENSION))
-.filter(Files::isRegularFile)
-.forEach(
-p ->
-files.put(
-
p.getFileName().toString().replace(FILE_EXTENSION, ""),
-p));
-} catch (Throwable t) {
-throw new CatalogException("Failed to list file catalog store 
directory", t);
-}
-return files;
+return listCatalogs().contains(catalogName);

Review Comment:
   Directly check if the catalog file exists here? I think get all catalogs may 
be not an efficient operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283907327


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -201,8 +222,23 @@ public Optional getCatalog(String 
catalogName) throws Catalog
 @Override
 public Set listCatalogs() throws CatalogException {
 checkOpenState();
+try {
+FileStatus[] statusArr = 
catalogStorePath.getFileSystem().listStatus(catalogStorePath);
 
-return Collections.unmodifiableSet(listAllCatalogFiles().keySet());
+return Arrays.stream(statusArr)
+.filter(status -> !status.isDir())
+.map(FileStatus::getPath)
+.map(Path::getName)
+.map(filename -> filename.replace(FILE_EXTENSION, ""))
+.collect(Collectors.toSet());
+} catch (CatalogException e) {
+throw e;

Review Comment:
   The same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283906870


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -172,22 +185,30 @@ public void removeCatalog(String catalogName, boolean 
ignoreIfNotExists)
 @Override
 public Optional getCatalog(String catalogName) throws 
CatalogException {
 checkOpenState();
-
-Path path = getCatalogPath(catalogName);
+Path catalogPath = getCatalogPath(catalogName);
 try {
-File file = path.toFile();
-if (!file.exists()) {
-LOG.warn("Catalog {}'s store file %s does not exist.", 
catalogName, path);
+FileSystem fs = catalogPath.getFileSystem();
+
+if (!fs.exists(catalogPath)) {
 return Optional.empty();
 }
-String content = FileUtils.readFile(file, charset);
-Map options = yaml.load(content);
-return Optional.of(CatalogDescriptor.of(catalogName, 
Configuration.fromMap(options)));
-} catch (Throwable t) {
+
+try (FSDataInputStream is = fs.open(catalogPath)) {
+Map configMap =
+YAML_MAPPER.readValue(is, new 
TypeReference>() {});
+
+CatalogDescriptor catalog =
+CatalogDescriptor.of(catalogName, 
Configuration.fromMap(configMap));
+
+return Optional.of(catalog);
+}
+} catch (CatalogException e) {
+throw e;

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283906615


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -132,31 +153,23 @@ public void storeCatalog(String catalogName, 
CatalogDescriptor catalog)
 public void removeCatalog(String catalogName, boolean ignoreIfNotExists)
 throws CatalogException {
 checkOpenState();
-
-Path path = getCatalogPath(catalogName);
+Path catalogPath = getCatalogPath(catalogName);
 try {
-File file = path.toFile();
-if (file.exists()) {
-if (!file.isFile()) {
-throw new CatalogException(
-String.format(
-"Catalog %s's store file %s is not a 
regular file",
-catalogName, path.getFileName()));
-}
-Files.deleteIfExists(path);
-} else {
-if (!ignoreIfNotExists) {
-throw new CatalogException(
-String.format(
-"Catalog %s's store file %s is not exist", 
catalogName, path));
-}
+FileSystem fs = catalogPath.getFileSystem();
+
+if (fs.exists(catalogPath)) {
+fs.delete(catalogPath, false);
+} else if (!ignoreIfNotExists) {
+throw new CatalogException(
+String.format(
+"Catalog %s's store file %s does not exist.",
+catalogName, catalogPath));
 }
-} catch (Throwable e) {
+} catch (CatalogException e) {
+throw e;

Review Comment:
   The same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283906508


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -97,25 +114,29 @@ public void storeCatalog(String catalogName, 
CatalogDescriptor catalog)
 throws CatalogException {
 checkOpenState();
 
-Path filePath = getCatalogPath(catalogName);
+Path catalogPath = getCatalogPath(catalogName);
 try {
-File file = filePath.toFile();
-if (file.exists()) {
+FileSystem fs = catalogPath.getFileSystem();
+
+if (fs.exists(catalogPath)) {
 throw new CatalogException(
 String.format(
 "Catalog %s's store file %s is already exist.",
-catalogName, filePath));
+catalogName, catalogPath));
 }
-// create a new file
-file.createNewFile();
-String yamlString = 
yaml.dumpAsMap(catalog.getConfiguration().toMap());
-FileUtils.writeFile(file, yamlString, charset);
-LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, filePath);
-} catch (Throwable e) {
+
+try (FSDataOutputStream os = fs.create(catalogPath, 
WriteMode.NO_OVERWRITE)) {
+YAML_MAPPER.writeValue(os, catalog.getConfiguration().toMap());
+}
+
+LOG.info("Catalog {}'s configuration saved to file {}", 
catalogName, catalogPath);
+} catch (CatalogException e) {
+throw e;

Review Comment:
   The handling here is a bit strange. Under what circumstances will a 
CatalogException be caught?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation

2023-08-03 Thread via GitHub


FangYongs commented on code in PR #23063:
URL: https://github.com/apache/flink/pull/23063#discussion_r1283903488


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java:
##
@@ -46,42 +49,56 @@ public class FileCatalogStore extends AbstractCatalogStore {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(FileCatalogStore.class);
 
-private static final String FILE_EXTENSION = ".yaml";
+static final String FILE_EXTENSION = ".yaml";
 
-/** The directory path where catalog configurations will be stored. */
-private final String catalogStoreDirectory;
-
-/** The character set to use when reading and writing catalog files. */
-private final String charset;
+/** The YAML mapper to use when reading and writing catalog files. */
+private static final YAMLMapper YAML_MAPPER =
+new 
YAMLMapper().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER);
 
-/** The YAML parser to use when reading and writing catalog files. */
-private final Yaml yaml = new Yaml();
+/** The directory path where catalog configurations will be stored. */
+private final Path catalogStorePath;
 
 /**
  * Creates a new {@link FileCatalogStore} instance with the specified 
directory path.
  *
- * @param catalogStoreDirectory the directory path where catalog 
configurations will be stored
+ * @param catalogStorePath the directory path where catalog configurations 
will be stored
  */
-public FileCatalogStore(String catalogStoreDirectory, String charset) {
-this.catalogStoreDirectory = catalogStoreDirectory;
-this.charset = charset;
+public FileCatalogStore(String catalogStorePath) {
+this.catalogStorePath = new Path(catalogStorePath);
 }
 
 /**
  * Opens the catalog store and initializes the catalog file map.
  *
- * @throws CatalogException if the catalog store directory does not exist 
or if there is an
- * error reading the directory
+ * @throws CatalogException if the catalog store directory does not exist, 
not a directory, or
+ * if there is an error reading the directory
  */
 @Override
 public void open() throws CatalogException {
-super.open();
-
 try {
+FileSystem fs = catalogStorePath.getFileSystem();
+if (!fs.exists(catalogStorePath)) {
+throw new CatalogException(
+String.format(
+"Failed to open catalog store. The catalog 
store directory %s does not exist.",
+catalogStorePath));
+}
 
-} catch (Throwable e) {
-throw new CatalogException("Failed to open file catalog store 
directory", e);
+if (!fs.getFileStatus(catalogStorePath).isDir()) {
+throw new CatalogException(
+String.format(
+"Failed to open catalog store. The given 
catalog store path %s is not a directory.",
+catalogStorePath));
+}
+} catch (CatalogException e) {
+throw e;
+} catch (Exception e) {
+throw new CatalogException(
+String.format(
+"Failed to open file catalog store directory %s.", 
catalogStorePath),
+e);
 }
+super.open();

Review Comment:
   Generally, `super.open()` is called first. Why is it called last here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32746) Enable ZGC in JDK17 to solve long time class unloading STW during fullgc

2023-08-03 Thread xiangyu feng (Jira)
xiangyu feng created FLINK-32746:


 Summary: Enable ZGC in JDK17 to solve long time class unloading 
STW during fullgc
 Key: FLINK-32746
 URL: https://issues.apache.org/jira/browse/FLINK-32746
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: xiangyu feng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xintongsong commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API

2023-08-03 Thread via GitHub


xintongsong commented on code in PR #23026:
URL: https://github.com/apache/flink/pull/23026#discussion_r1283896762


##
flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java:
##
@@ -77,12 +80,23 @@
  *   Broadcast variables in bulk iterations
  *   Custom Java objects (POJOs)
  * 
+ *
+ * Note: All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
+ * Flink major version. You can still build your application in DataSet, but 
you should move to
+ * either the DataStream and/or Table API. This class is retained for testing 
purposes.
  */
 @SuppressWarnings("serial")
 public class KMeans {
 
+private static final Logger LOGGER = LoggerFactory.getLogger(KMeans.class);
+
 public static void main(String[] args) throws Exception {
 
+LOGGER.warn(
+"All Flink DataSet APIs are deprecated since Flink 1.18 and 
will be removed in a future"
++ " Flink major version. You can still build your 
application in DataSet, but you should move to"
++ " either the DataStream and/or Table API. This class 
is retained for testing purposes.");

Review Comment:
   Can we deduplicate the warning message somewhere?



##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -544,6 +544,13 @@ public ExecutionConfig setExecutionRetryDelay(long 
executionRetryDelay) {
  * The default execution mode is {@link ExecutionMode#PIPELINED}.
  *
  * @param executionMode The execution mode to use.
+ * @deprecated The {@link ExecutionMode} is deprecated because it's only 
used in DataSet APIs.
+ * All Flink DataSet APIs are deprecated since Flink 1.18 and will be 
removed in a future
+ * Flink major version. You can still build your application in 
DataSet, but you should move
+ * to either the DataStream and/or Table API.
+ * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;>
+ * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and 
deprecate the DataSet
+ * API
  */
 public void setExecutionMode(ExecutionMode executionMode) {

Review Comment:
   Annotation is missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 merged pull request #22992: [hotfix] Fix typo in JobManagerOptions

2023-08-03 Thread via GitHub


wanglijie95 merged PR #22992:
URL: https://github.com/apache/flink/pull/22992


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32741) Remove DataSet related descriptions in doc

2023-08-03 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750921#comment-17750921
 ] 

Xintong Song commented on FLINK-32741:
--

Thanks for volunteering, [~pegasas]. You are assigned. Please go ahead.

FYI, the DataSet documentation should be removed before we shipping the release 
1.18, which means we only have ~2-3 weeks woking on this (including PR review).

> Remove DataSet related descriptions in doc
> --
>
> Key: FLINK-32741
> URL: https://issues.apache.org/jira/browse/FLINK-32741
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Documentation
>Affects Versions: 2.0.0
>Reporter: Wencong Liu
>Assignee: Junyao Huang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 1.18.0
>
>
> Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't 
> recommend developers to use the DataSet, the descriptions of DataSet should 
> be removed in the doc after [FLINK-32558].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32741) Remove DataSet related descriptions in doc

2023-08-03 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-32741:


Assignee: Junyao Huang

> Remove DataSet related descriptions in doc
> --
>
> Key: FLINK-32741
> URL: https://issues.apache.org/jira/browse/FLINK-32741
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Documentation
>Affects Versions: 2.0.0
>Reporter: Wencong Liu
>Assignee: Junyao Huang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't 
> recommend developers to use the DataSet, the descriptions of DataSet should 
> be removed in the doc after [FLINK-32558].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32741) Remove DataSet related descriptions in doc

2023-08-03 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song updated FLINK-32741:
-
Fix Version/s: 1.18.0
   (was: 2.0.0)

> Remove DataSet related descriptions in doc
> --
>
> Key: FLINK-32741
> URL: https://issues.apache.org/jira/browse/FLINK-32741
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Documentation
>Affects Versions: 2.0.0
>Reporter: Wencong Liu
>Assignee: Junyao Huang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 1.18.0
>
>
> Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't 
> recommend developers to use the DataSet, the descriptions of DataSet should 
> be removed in the doc after [FLINK-32558].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32740) Introduce literal null value for flink-conf.yaml

2023-08-03 Thread Yao Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750916#comment-17750916
 ] 

Yao Zhang commented on FLINK-32740:
---

Hi [~JunRuiLi],

Good question. It is also my concern.

I searched values in 
[Configuration|http://https//nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/config/]
 and there are no valid values named null(but there are some valid values named 
NONE). Some values may contain null (e.g. /dev/null) but they will not be 
translated to null. If the user intends to set the value as the string "null", 
a slight change is needed by adding double quotes (e.g. some.key: "null").

I also investigate the null literals defined in YAML standard:
 * 
 * ~
 * null (case sensitive)

Currently we may follow all the standards or only part of them, as the YAML 
parser is not used.

> Introduce literal null value for flink-conf.yaml
> 
>
> Key: FLINK-32740
> URL: https://issues.apache.org/jira/browse/FLINK-32740
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.15.4
>Reporter: Yao Zhang
>Priority: Minor
>
> Hi community,
> Currently in flink-conf.yaml we only consider simplified YAML syntax like key 
> value pairs. And it might not be the right timing to indroduce YAML parser. 
> As [FLINK-23620 |https://issues.apache.org/jira/browse/FLINK-23620] has been 
> stated, there might be some keys that violate the YAML naming conventions.
> The current situation is, if we want to unset the value (or set the value as 
> its default), what we could do is to remove the key value pair completely or 
> leave the value blank (e.g. `rest.port: `). It might be inconvenient or less 
> readable for conf file that controlled by other applications.
> So is it necessary to add some special literal values that will be translated 
> to the real null value? For YAML we usually see ~, null or empty value as 
> null value. If necessary I would like to do this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #23132: [python] Relax `grpcio` requirement to build on M2

2023-08-03 Thread via GitHub


flinkbot commented on PR #23132:
URL: https://github.com/apache/flink/pull/23132#issuecomment-1664610864

   
   ## CI report:
   
   * 6d68021f5f4f27459374090329157bc8d05b9e63 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] deepyaman opened a new pull request, #23132: [python] Relax `grpcio` requirement to build on M2

2023-08-03 Thread via GitHub


deepyaman opened a new pull request, #23132:
URL: https://github.com/apache/flink/pull/23132

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32729) allow create an initial deployment with suspend state

2023-08-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32729:
---
Labels: pull-request-available  (was: )

> allow create an initial deployment with suspend state
> -
>
> Key: FLINK-32729
> URL: https://issues.apache.org/jira/browse/FLINK-32729
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>  Labels: pull-request-available
>
> With this feature, Users could create an application in suspend status as a 
> backup for the other running application to improve the failure recovery time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] bjhaid opened a new pull request, #641: [FLINK-32729] allow starting a flink job in a suspended state.

2023-08-03 Thread via GitHub


bjhaid opened a new pull request, #641:
URL: https://github.com/apache/flink-kubernetes-operator/pull/641

   In our setup we intend migrating flinkDeployments from one cluster to the 
other, as part of doing this we want to start the flinkDeploments in a 
suspended state with an initialSavePoint configured, and only configure it to 
start after we must have stopped flinkDeployment in the source cluster. This 
patch was agreed upon in this [slack 
thread](https://apache-flink.slack.com/archives/C03GV7L3G2C/p1690825041612599).
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request adds a new feature to periodically create 
and maintain savepoints through the `FlinkDeployment` custom resource.)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *Periodic savepoint trigger is introduced to the custom resource*
 - *The operator checks on reconciliation whether the required time has 
passed*
 - *The JobManager's dispose savepoint API is used to clean up obsolete 
savepoints*
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no)
 - Core observer or reconciler logic that is regularly executed: (yes / no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] netvl commented on pull request #22854: [FLINK-32137] Added filtering of lambdas when building the flame graph

2023-08-03 Thread via GitHub


netvl commented on PR #22854:
URL: https://github.com/apache/flink/pull/22854#issuecomment-1664427570

   Thank you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] dannycranmer commented on a diff in pull request #668: [FLINK-32736] Externalized connectors

2023-08-03 Thread via GitHub


dannycranmer commented on code in PR #668:
URL: https://github.com/apache/flink-web/pull/668#discussion_r1283535822


##
docs/content/posts/2023-08-03-externalized-connectors.md:
##
@@ -0,0 +1,77 @@
+---
+title: "Announcing three new Apache Flink connectors, the new connector 
versioning strategy and externalization"
+date: "2023-08-03T12:00:00Z"
+authors:
+- elphastori: 
+  name: "Elphas Toringepi"
+  twitter: "elphastori"
+aliases:
+- /news/2023/08/03/externalized-connectors.html
+---
+
+## New connectors
+
+We're excited to announce that Apache Flink now supports three new connectors: 
[Amazon DynamoDB](https://aws.amazon.com/dynamodb), 
[MongoDB](https://www.mongodb.com/) and [OpenSearch](https://opensearch.org/)! 
The connectors are available for both the 
[DataStream](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/)
 and 
[Table/SQL](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/)
 APIs.  
+
+- **[Amazon 
DynamoDB](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)**
 - This connector includes a sink that provides at-least-once delivery 
guarantees.
+- **[MongoDB 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/mongodb/)**
 - This connector includes a source and sink that provide at-least-once 
guarantees.
+- **[OpenSearch 
sink](https://github.com/apache/flink-connector-opensearch/blob/main/docs/content/docs/connectors/datastream/opensearch.md)**
 - This connector includes a sink that provides at-least-once guarantees

Review Comment:
   ```suggestion
   - **[OpenSearch 
sink](https://github.com/apache/flink-connector-opensearch/blob/main/docs/content/docs/connectors/datastream/opensearch.md)**
 - This connector includes a sink that provides at-least-once guarantees.
   ```



##
docs/content/posts/2023-08-03-externalized-connectors.md:
##
@@ -0,0 +1,77 @@
+---
+title: "Announcing three new Apache Flink connectors, the new connector 
versioning strategy and externalization"
+date: "2023-08-03T12:00:00Z"
+authors:
+- elphastori: 
+  name: "Elphas Toringepi"
+  twitter: "elphastori"
+aliases:
+- /news/2023/08/03/externalized-connectors.html
+---
+
+## New connectors
+
+We're excited to announce that Apache Flink now supports three new connectors: 
[Amazon DynamoDB](https://aws.amazon.com/dynamodb), 
[MongoDB](https://www.mongodb.com/) and [OpenSearch](https://opensearch.org/)! 
The connectors are available for both the 
[DataStream](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/)
 and 
[Table/SQL](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/)
 APIs.  
+
+- **[Amazon 
DynamoDB](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)**
 - This connector includes a sink that provides at-least-once delivery 
guarantees.
+- **[MongoDB 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/mongodb/)**
 - This connector includes a source and sink that provide at-least-once 
guarantees.
+- **[OpenSearch 
sink](https://github.com/apache/flink-connector-opensearch/blob/main/docs/content/docs/connectors/datastream/opensearch.md)**
 - This connector includes a sink that provides at-least-once guarantees
+
+|Connector|Date Released|Supported Flink Versions|
+|---|---|---|
+|Amazon DynamoDB sink|12/2/2022|1.15+|
+|MongoDB connector|3/31/2023|1.16+|
+|OpenSearch sink|12/21/2022|1.16+|
+
+## Externalized connectors
+
+The community have externalized connectors from [Flink’s main 
repository](https://github.com/apache/flink). This was driven to realise the 
following benefits:
+- **Faster releases of connectors:** New features can be added more quickly, 
bugs can be fixed immediately, and we can have faster security patches in case 
of direct or indirect (through dependencies) security flaws.
+- **Adding newer connector features to older Flink versions:** By having 
stable connector APIs, the same connector artifact may be used with different 
Flink versions. Thus, new features can also immediately be used with older 
Flink versions. 
+- **More activity and contributions around connectors:** By easing the 
contribution and development process around connectors, we will see faster 
development and also more connectors.
+- **Documentation:** Standardized documentation and user experience for the 
connectors, regardless of where they are maintained. 
+- **A faster Flink CI:** By not needing to build and test connectors, the 
Flink CI pipeline will be faster and Flink developers will experience fewer 
build stabilities (which mostly come from connectors). That should speed up 
Flink development.
+
+The following connectors have been moved to individual repositories:  
+
+- [Kafka / Upsert-Kafka](https://github.com/apache/flink-connector-kafka)
+- 

[jira] [Commented] (FLINK-32741) Remove DataSet related descriptions in doc

2023-08-03 Thread Junyao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750841#comment-17750841
 ] 

Junyao Huang commented on FLINK-32741:
--

Hi, [~Wencong Liu] ,

May I work on this issue?

> Remove DataSet related descriptions in doc
> --
>
> Key: FLINK-32741
> URL: https://issues.apache.org/jira/browse/FLINK-32741
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Documentation
>Affects Versions: 2.0.0
>Reporter: Wencong Liu
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't 
> recommend developers to use the DataSet, the descriptions of DataSet should 
> be removed in the doc after [FLINK-32558].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] elphastori commented on a diff in pull request #668: [FLINK-32736] Externalized connectors

2023-08-03 Thread via GitHub


elphastori commented on code in PR #668:
URL: https://github.com/apache/flink-web/pull/668#discussion_r1283509741


##
docs/content/posts/2023-08-03-connector-updates.md:
##
@@ -0,0 +1,68 @@
+---
+title: "DynamoDB, MongoDB and OpenSearch externalized connectors"
+date: "2023-08-03T12:00:00Z"
+authors:
+- elphastori: 
+  name: "Elphas Toringepi"
+  twitter: "elphastori"
+aliases:
+- /news/2023/08/03/externalized-connectors.html
+---
+
+## New connectors

Review Comment:
   Thanks, I've listed the contributors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23131: Explanation of Asynchronous Problem in Querying Data from HBase

2023-08-03 Thread via GitHub


flinkbot commented on PR #23131:
URL: https://github.com/apache/flink/pull/23131#issuecomment-1664321928

   
   ## CI report:
   
   * 30618a118c1bc19d4fb9b1a6495bd41ac869350f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] GududeDog opened a new pull request, #23131: Explanation of Asynchronous Problem in Querying Data from HBase

2023-08-03 Thread via GitHub


GududeDog opened a new pull request, #23131:
URL: https://github.com/apache/flink/pull/23131

   Explanation of Asynchronous Problem in Querying Data from HBase
   
   When sync is not enabled, the HBaseRowDataLookupFunction object will be 
created. When entering this method, you notice that an HTable table is 
declared, but it is not immediately assigned. Instead, it is assigned in the 
open method. In actual development, a connection object is created in the open 
method, which is secure. If used, it will be closed. If it is a Table object, 
it is not safe. It is recommended to turn it off after using it. However, in 
the code, it can be seen that it is created in the open method and closed in 
the close method. If the data source continues to come, the close method will 
not execute. If it is a connected object, it is not a problem. If it is a table 
object, using the same object when processing each piece of data will cause 
problems. In the lookup method, if you want to query data in HBase, you must 
first create an object called the table get (get) method. When querying data, 
you use the same data, which may result in inaccurate data. If it is multit
 hreaded, it will be unsafe
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hackergin commented on pull request #23109: [FLINK-32475][docs] Add doc for time travel

2023-08-03 Thread via GitHub


hackergin commented on PR #23109:
URL: https://github.com/apache/flink/pull/23109#issuecomment-1664182437

   @luoyuxia please take a look at this pr when you are free, thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-18356) flink-table-planner Exit code 137 returned from process

2023-08-03 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750786#comment-17750786
 ] 

Matthias Pohl commented on FLINK-18356:
---

[~337361...@qq.com] I guess you can create a PR in 
https://github.com/zentol/flink-ci-docker. AFAIR, that was the fork for the CI 
docker image that was recently pushed to the Docker hub 
(https://hub.docker.com/r/chesnay/flink-ci/tags).

> flink-table-planner Exit code 137 returned from process
> ---
>
> Key: FLINK-18356
> URL: https://issues.apache.org/jira/browse/FLINK-18356
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Tests
>Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0
>Reporter: Piotr Nowojski
>Assignee: Yunhong Zheng
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Attachments: 1234.jpg, app-profiling_4.gif, 
> image-2023-01-11-22-21-57-784.png, image-2023-01-11-22-22-32-124.png, 
> image-2023-02-16-20-18-09-431.png, image-2023-07-11-19-28-52-851.png, 
> image-2023-07-11-19-35-54-530.png, image-2023-07-11-19-41-18-626.png, 
> image-2023-07-11-19-41-37-105.png
>
>
> {noformat}
> = test session starts 
> ==
> platform linux -- Python 3.7.3, pytest-5.4.3, py-1.8.2, pluggy-0.13.1
> cachedir: .tox/py37-cython/.pytest_cache
> rootdir: /__w/3/s/flink-python
> collected 568 items
> pyflink/common/tests/test_configuration.py ..[  
> 1%]
> pyflink/common/tests/test_execution_config.py ...[  
> 5%]
> pyflink/dataset/tests/test_execution_environment.py .
> ##[error]Exit code 137 returned from process: file name '/bin/docker', 
> arguments 'exec -i -u 1002 
> 97fc4e22522d2ced1f4d23096b8929045d083dd0a99a4233a8b20d0489e9bddb 
> /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'.
> Finishing: Test - python
> {noformat}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3729=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] elphastori commented on a diff in pull request #668: [FLINK-32736] Externalized connectors

2023-08-03 Thread via GitHub


elphastori commented on code in PR #668:
URL: https://github.com/apache/flink-web/pull/668#discussion_r1283321359


##
docs/content/posts/2023-08-03-connector-updates.md:
##
@@ -0,0 +1,68 @@
+---
+title: "DynamoDB, MongoDB and OpenSearch externalized connectors"
+date: "2023-08-03T12:00:00Z"
+authors:
+- elphastori: 
+  name: "Elphas Toringepi"
+  twitter: "elphastori"
+aliases:
+- /news/2023/08/03/externalized-connectors.html
+---
+
+## New connectors
+
+We're excited to announce that Apache Flink now includes 3 new connectors for 
[DynamoDB](https://aws.amazon.com/dynamodb), 
[MongoDB](https://www.mongodb.com/) and [OpenSearch](https://opensearch.org/)! 
The connectors are available for both the 
[DataStream](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/)
 and 
[Table/SQL](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/)
 APIs.  

Review Comment:
   I'll also change 3 to three.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] elphastori commented on a diff in pull request #668: [FLINK-32736] Externalized connectors

2023-08-03 Thread via GitHub


elphastori commented on code in PR #668:
URL: https://github.com/apache/flink-web/pull/668#discussion_r1283317707


##
docs/content/posts/2023-08-03-connector-updates.md:
##
@@ -0,0 +1,68 @@
+---
+title: "DynamoDB, MongoDB and OpenSearch externalized connectors"

Review Comment:
   It sounds better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32745) Add a flag to skip InputSelectable preValidate step

2023-08-03 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32745:


 Summary: Add a flag to skip InputSelectable preValidate step
 Key: FLINK-32745
 URL: https://issues.apache.org/jira/browse/FLINK-32745
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, Runtime / Configuration
Reporter: Chesnay Schepler
 Fix For: 1.19.0


{{StreamingJobGraphGenerator#preValidate}} has a step where it checks that no 
operator implements {{InputSelectable}} if checkpointing is enabled, because 
these features aren't compatible.

This step can be extremely expensive when the {{CodeGenOperatorFactory}} is 
used, because it requires all generated operator classes to actually be 
compiled (which usually only happens on the task manager).

If you know what jobs you're running this step can be pure overhead.
It would be nice if we'd have a flag to skip this validation step.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32744) HiveDialectQueryITCase.testWithOverWindow fails with timeout

2023-08-03 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-32744:
--
Priority: Critical  (was: Major)

> HiveDialectQueryITCase.testWithOverWindow fails with timeout
> 
>
> Key: FLINK-32744
> URL: https://issues.apache.org/jira/browse/FLINK-32744
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.17.1
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51922=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=22287
> {code}
> Aug 03 03:36:53 [ERROR] 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testWithOverWindow  
> Time elapsed: 38.534 s  <<< ERROR!
> Aug 03 03:36:53 org.apache.flink.table.api.TableException: Failed to execute 
> sql
> Aug 03 03:36:53   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:976)
> Aug 03 03:36:53   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1424)
> Aug 03 03:36:53   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
> Aug 03 03:36:53   at 
> org.apache.flink.connectors.hive.HiveDialectQueryITCase.testWithOverWindow(HiveDialectQueryITCase.java:690)
> Aug 03 03:36:53   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Aug 03 03:36:53   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Aug 03 03:36:53   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Aug 03 03:36:53   at java.lang.reflect.Method.invoke(Method.java:498)
> Aug 03 03:36:53   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Aug 03 03:36:53   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Aug 03 03:36:53   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Aug 03 03:36:53   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Aug 03 03:36:53   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Aug 03 03:36:53   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Aug 03 03:36:53   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Aug 03 03:36:53   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> Aug 03 03:36:53   at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> Aug 03 03:36:53   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Aug 03 03:36:53   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Aug 03 03:36:53   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> Aug 03 03:36:53   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> Aug 03 03:36:53   at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
> Aug 03 03:36:53   at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
> Aug 03 03:36:53   at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
> Aug 03 03:36:53   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
> Aug 03 03:36:53   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
> Aug 03 03:36:53   at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
> Aug 03 03:36:53   at 
> 

[jira] [Created] (FLINK-32744) HiveDialectQueryITCase.testWithOverWindow fails with timeout

2023-08-03 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-32744:
-

 Summary: HiveDialectQueryITCase.testWithOverWindow fails with 
timeout
 Key: FLINK-32744
 URL: https://issues.apache.org/jira/browse/FLINK-32744
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive, Tests
Affects Versions: 1.17.1
Reporter: Matthias Pohl


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51922=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=22287

{code}
Aug 03 03:36:53 [ERROR] 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testWithOverWindow  
Time elapsed: 38.534 s  <<< ERROR!
Aug 03 03:36:53 org.apache.flink.table.api.TableException: Failed to execute sql
Aug 03 03:36:53 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:976)
Aug 03 03:36:53 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1424)
Aug 03 03:36:53 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
Aug 03 03:36:53 at 
org.apache.flink.connectors.hive.HiveDialectQueryITCase.testWithOverWindow(HiveDialectQueryITCase.java:690)
Aug 03 03:36:53 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Aug 03 03:36:53 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Aug 03 03:36:53 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Aug 03 03:36:53 at java.lang.reflect.Method.invoke(Method.java:498)
Aug 03 03:36:53 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Aug 03 03:36:53 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Aug 03 03:36:53 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Aug 03 03:36:53 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Aug 03 03:36:53 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Aug 03 03:36:53 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Aug 03 03:36:53 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Aug 03 03:36:53 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Aug 03 03:36:53 at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
Aug 03 03:36:53 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Aug 03 03:36:53 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Aug 03 03:36:53 at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
Aug 03 03:36:53 at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
Aug 03 03:36:53 at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
Aug 03 03:36:53 at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
Aug 03 03:36:53 at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
Aug 03 03:36:53 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
Aug 03 03:36:53 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
Aug 03 03:36:53 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
Aug 03 03:36:53 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
Aug 03 03:36:53 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
Aug 03 03:36:53 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
Aug 03 03:36:53 at 

  1   2   >