[GitHub] [flink] pegasas commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
pegasas commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1284013387 ## flink-examples/flink-examples-batch/pom.xml: ## @@ -48,25 +48,6 @@ under the License. - - - org.apache.maven.plugins - maven-compiler-plugin - - - compile - process-sources - - compile - - - -Xlint:deprecation - true - - - - - Review Comment: Thanks @xintongsong , @WencongLiu , I make a PR for tracking: https://github.com/apache/flink/pull/23135 will request a review after going through it with myself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32741) Remove DataSet related descriptions in doc
[ https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32741: --- Labels: 2.0-related pull-request-available (was: 2.0-related) > Remove DataSet related descriptions in doc > -- > > Key: FLINK-32741 > URL: https://issues.apache.org/jira/browse/FLINK-32741 > Project: Flink > Issue Type: Technical Debt > Components: Documentation >Affects Versions: 2.0.0 >Reporter: Wencong Liu >Assignee: Junyao Huang >Priority: Major > Labels: 2.0-related, pull-request-available > Fix For: 1.18.0 > > > Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't > recommend developers to use the DataSet, the descriptions of DataSet should > be removed in the doc after [FLINK-32558]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] pegasas opened a new pull request, #23135: [FLINK-32741]Remove DataSet related descriptions in doc
pegasas opened a new pull request, #23135: URL: https://github.com/apache/flink/pull/23135 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-29527) Make unknownFieldsIndices work for single ParquetReader
[ https://issues.apache.org/jira/browse/FLINK-29527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen resolved FLINK-29527. --- Resolution: Fixed master via 50622df1f01cd9cade78bfe2add5a7faff678e3e Shall we pick to other version? It seems somehow a feature catch up or fix. > Make unknownFieldsIndices work for single ParquetReader > --- > > Key: FLINK-29527 > URL: https://issues.apache.org/jira/browse/FLINK-29527 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Sun Shun >Assignee: Sun Shun >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, from the improvement FLINK-23715, Flink use a collection named > `unknownFieldsIndices` to track the nonexistent fields, and it is kept inside > the `ParquetVectorizedInputFormat`, and applied to all parquet files under > given path. > However, some fields may only be nonexistent in some of the historical > parquet files, while exist in latest ones. And based on > `unknownFieldsIndices`, flink will always skip these fields, even thought > they are existing in the later parquets. > As a result, the value of these fields will become empty when they are > nonexistent in some historical parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29527) Make unknownFieldsIndices work for single ParquetReader
[ https://issues.apache.org/jira/browse/FLINK-29527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zili Chen updated FLINK-29527: -- Fix Version/s: 1.19.0 > Make unknownFieldsIndices work for single ParquetReader > --- > > Key: FLINK-29527 > URL: https://issues.apache.org/jira/browse/FLINK-29527 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: Sun Shun >Assignee: Sun Shun >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > Currently, from the improvement FLINK-23715, Flink use a collection named > `unknownFieldsIndices` to track the nonexistent fields, and it is kept inside > the `ParquetVectorizedInputFormat`, and applied to all parquet files under > given path. > However, some fields may only be nonexistent in some of the historical > parquet files, while exist in latest ones. And based on > `unknownFieldsIndices`, flink will always skip these fields, even thought > they are existing in the later parquets. > As a result, the value of these fields will become empty when they are > nonexistent in some historical parquet files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle
reswqa commented on code in PR #23100: URL: https://github.com/apache/flink/pull/23100#discussion_r1284006406 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { Review Comment: To be honest, I don't like this way, but I can't think of a better way also. In theory, `FileSystem` should handle (or wrap) exceptions that can be retried by itself, but it doesn't help us do this. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { Review Comment: To be honest, I don't like this way, but I can't think of a better way also. In theory, `FileSystem` should handle (or wrap) exceptions that can be retried by itself, but it doesn't help us do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28915) Make application mode could support remote DFS schema(e.g. S3, OSS, HDFS, etc.)
[ https://issues.apache.org/jira/browse/FLINK-28915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750960#comment-17750960 ] Junyao Huang commented on FLINK-28915: -- Hi, [~hjw], [~wangyang0918], What is the status of this thread? Is there any plan to support ABFS https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html? > Make application mode could support remote DFS schema(e.g. S3, OSS, HDFS, > etc.) > --- > > Key: FLINK-28915 > URL: https://issues.apache.org/jira/browse/FLINK-28915 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, flink-contrib >Reporter: hjw >Assignee: hjw >Priority: Major > Labels: pull-request-available > > As the Flink document show , local is the only supported scheme in Native k8s > deployment. > Is there have a plan to support s3 filesystem? thx. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] reswqa commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle
reswqa commented on code in PR #23100: URL: https://github.com/apache/flink/pull/23100#discussion_r1284002092 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { +currentRetryTime++; +throwException(t, "Failed to check the status of segment file."); Review Comment: Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle
reswqa commented on code in PR #23100: URL: https://github.com/apache/flink/pull/23100#discussion_r1284000935 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -109,7 +118,11 @@ private FileSystem createFileSystem() { /** Start the executor. */ public void start() { -scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS); +synchronized (scannerExecutor) { Review Comment: Ah, my mistake, fair enough. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -109,7 +118,11 @@ private FileSystem createFileSystem() { /** Start the executor. */ public void start() { -scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS); +synchronized (scannerExecutor) { Review Comment: Ah, my mistake, fair enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32735) Flink SQL Gateway for Native Kubernetes Application Mode
[ https://issues.apache.org/jira/browse/FLINK-32735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Junyao Huang closed FLINK-32735. Tracked by FLIP-316. > Flink SQL Gateway for Native Kubernetes Application Mode > - > > Key: FLINK-32735 > URL: https://issues.apache.org/jira/browse/FLINK-32735 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Table SQL / Gateway >Affects Versions: 1.19.0 >Reporter: Junyao Huang >Priority: Major > > Hi, Flink Community, > I have visited our doc on these pages: > # Flink Native Kubernetes: > [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/] > # Flink SQL Gateway: > [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/] > # Flink Kubernetes Operator Roadmap: > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/roadmap/] > Will we support Flink SQL on Native Kubernetes in our next version? > Personally, I think it is an important feature, especially in this > cloud-native trend era. > Since we have support per job on YARN, > I do not know if there is non-technical thinking for blocking. such as > commerce. > Have we supported Flink SQL Gateway with native k8s now? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-32735) Flink SQL Gateway for Native Kubernetes Application Mode
[ https://issues.apache.org/jira/browse/FLINK-32735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Junyao Huang resolved FLINK-32735. -- Release Note: tracked by FLIP-316. Resolution: Fixed > Flink SQL Gateway for Native Kubernetes Application Mode > - > > Key: FLINK-32735 > URL: https://issues.apache.org/jira/browse/FLINK-32735 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Table SQL / Gateway >Affects Versions: 1.19.0 >Reporter: Junyao Huang >Priority: Major > > Hi, Flink Community, > I have visited our doc on these pages: > # Flink Native Kubernetes: > [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/] > # Flink SQL Gateway: > [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/] > # Flink Kubernetes Operator Roadmap: > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/roadmap/] > Will we support Flink SQL on Native Kubernetes in our next version? > Personally, I think it is an important feature, especially in this > cloud-native trend era. > Since we have support per job on YARN, > I do not know if there is non-technical thinking for blocking. such as > commerce. > Have we supported Flink SQL Gateway with native k8s now? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32735) Flink SQL Gateway for Native Kubernetes Application Mode
[ https://issues.apache.org/jira/browse/FLINK-32735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750956#comment-17750956 ] Junyao Huang commented on FLINK-32735: -- Thanks [~lsy]! Your answer exactly fits my question! Will follow FLIP-316 for further steps and close this issue. > Flink SQL Gateway for Native Kubernetes Application Mode > - > > Key: FLINK-32735 > URL: https://issues.apache.org/jira/browse/FLINK-32735 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Table SQL / Gateway >Affects Versions: 1.19.0 >Reporter: Junyao Huang >Priority: Major > > Hi, Flink Community, > I have visited our doc on these pages: > # Flink Native Kubernetes: > [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/] > # Flink SQL Gateway: > [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/] > # Flink Kubernetes Operator Roadmap: > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/roadmap/] > Will we support Flink SQL on Native Kubernetes in our next version? > Personally, I think it is an important feature, especially in this > cloud-native trend era. > Since we have support per job on YARN, > I do not know if there is non-technical thinking for blocking. such as > commerce. > Have we supported Flink SQL Gateway with native k8s now? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Spongebob closed FLINK-32739. - Resolution: Not A Problem > restarting not work > --- > > Key: FLINK-32739 > URL: https://issues.apache.org/jira/browse/FLINK-32739 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > Attachments: jobmanager.log > > > In my flink standalone cluster, I config 2 taskmanagers. Then I test with > following steps: > # submit streaming job which was configed to fixed restart strategy to flink > session environment > # this job was running on taskmanager1. Then I killed the taskmanager1. > # this job turned to be failed after restarting attemps. > this job could not be transported to taskmanager2 which had enough slots as > expected. > Here's the exception trace: > {code:java} > 2023-08-03 15:13:56 > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, > backoffTimeMS=1) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) > at > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > at > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) > at > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) > at > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) > at >
[jira] [Created] (FLINK-32749) Sql gateway supports default catalog loaded by CatalogStore
Fang Yong created FLINK-32749: - Summary: Sql gateway supports default catalog loaded by CatalogStore Key: FLINK-32749 URL: https://issues.apache.org/jira/browse/FLINK-32749 Project: Flink Issue Type: Improvement Components: Table SQL / Gateway Affects Versions: 1.19.0 Reporter: Fang Yong Currently sql gateway will create memory catalog as default catalog, it should support default catalog loaded by catalog store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle
WencongLiu commented on code in PR #23100: URL: https://github.com/apache/flink/pull/23100#discussion_r1283977914 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { +currentRetryTime++; +throwException(t, "Failed to check the status of segment file."); Review Comment: `throwException ` can be modified to `tryThrowException`, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle
WencongLiu commented on code in PR #23100: URL: https://github.com/apache/flink/pull/23100#discussion_r1283977912 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { +currentRetryTime++; +throwException(t, "Failed to check the status of segment file."); } return isExist; } +private void throwException(Throwable t, String logMessage) { +LOG.error(logMessage); Review Comment: `LOG.error` has been modified to `LOG.warn` ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { +currentRetryTime++; +throwException(t, "Failed to check the status of segment file."); Review Comment: `throwException ` can be modified to `tryThrowException` ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { Review Comment: The type of exceptions is various when the code `remoteFileSystem.exists` is execute because the `remoteFileSystem` can be implemented by OSS, S3, etc. I think using `Throwable` is also acceptable because there is a `MAX_RETRY_TIME`. WDYT? 樂 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { Review Comment: The type of exceptions is various when the code `remoteFileSystem.exists` is execute because the `remoteFileSystem` can be implemented by OSS, S3, etc. I think using `Throwable` is also acceptable because there is a `MAX_RETRY_TIME`. WDYT? 樂 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -207,20 +222,23 @@ private void scanMaxSegmentId( Path segmentFinishDir = getSegmentFinishDirPath( baseRemoteStoragePath, partitionId, subpartitionId.getSubpartitionId()); -FileStatus[] fileStatuses; +FileStatus[] fileStatuses = new FileStatus[0]; try { if (!remoteFileSystem.exists(segmentFinishDir)) { return; } fileStatuses = remoteFileSystem.listStatus(segmentFinishDir); -} catch (IOException e) { -throw new RuntimeException( -"Failed to list the segment finish file. " + segmentFinishDir, e); +currentRetryTime = 0; +} catch (Throwable t) { +if (t instanceof java.io.FileNotFoundException) { +return; +} +currentRetryTime++; +throwException(t, "Failed to list the segment finish file."); } -if
[jira] [Commented] (FLINK-32735) Flink SQL Gateway for Native Kubernetes Application Mode
[ https://issues.apache.org/jira/browse/FLINK-32735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750945#comment-17750945 ] dalongliu commented on FLINK-32735: --- [~pegasas] As stated in the documentation you listed, Native K8S is a session cluster, similar to yarn session clusters, and Flink SQL gateway supports native k8s. You can specify the context for the K8S clusters when you start the Gateway, and then the Gateway will be able to submit the jobs to the corresponding clusters. Based on your description, I guess you may be asking if Flink SQL Gateway supports application mode on native k8s, which is currently not supported, but the community is already discussing this issue with FLIP-316, and we may be able to provide support for it in version 1.19. You can see FLIP-316 for more cdetail. Finally, this is a question about Flink SQL gateway and K8S integration support, not to solve a specific implementation details of Flink, opening a Jira issue is not appropriate, you can directly in Flink's user mailing list to start an discussion, there will be experts in the community to answer your questions. [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-316%3A+Introduce+SQL+Driver > Flink SQL Gateway for Native Kubernetes Application Mode > - > > Key: FLINK-32735 > URL: https://issues.apache.org/jira/browse/FLINK-32735 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes, Table SQL / Gateway >Affects Versions: 1.19.0 >Reporter: Junyao Huang >Priority: Major > > Hi, Flink Community, > I have visited our doc on these pages: > # Flink Native Kubernetes: > [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/] > # Flink SQL Gateway: > [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/] > # Flink Kubernetes Operator Roadmap: > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/roadmap/] > Will we support Flink SQL on Native Kubernetes in our next version? > Personally, I think it is an important feature, especially in this > cloud-native trend era. > Since we have support per job on YARN, > I do not know if there is non-technical thinking for blocking. such as > commerce. > Have we supported Flink SQL Gateway with native k8s now? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283910488 ## flink-examples/flink-examples-batch/pom.xml: ## @@ -48,25 +48,6 @@ under the License. - - - org.apache.maven.plugins - maven-compiler-plugin - - - compile - process-sources - - compile - - - -Xlint:deprecation - true - - - - - Review Comment: 1. The classes in `flink-examples-batch` module is still utilized for testing purpose. I have added the warning log in all batch example classes to remind the developer that the DataSet API is deprecated. I also have added the note to `bin.xml` in `flink-disk` module to explain the batch examples in example package is only for tests. I have create an issue [FLINK-32642](https://issues.apache.org/jira/browse/FLINK-32742) to track the removal of `flink-examples-batch` module. 2. I have created an issue [FLINK-32741](https://issues.apache.org/jira/browse/FLINK-32741) to track the removal of DataSet related documentations. Currently the issue is assigned to @pegasas . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283959998 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -544,6 +544,13 @@ public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) { * The default execution mode is {@link ExecutionMode#PIPELINED}. * * @param executionMode The execution mode to use. + * @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs. + * All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future + * Flink major version. You can still build your application in DataSet, but you should move + * to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API */ public void setExecutionMode(ExecutionMode executionMode) { Review Comment: The annotation is added. ## flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java: ## @@ -77,12 +80,23 @@ * Broadcast variables in bulk iterations * Custom Java objects (POJOs) * + * + * Note: All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future + * Flink major version. You can still build your application in DataSet, but you should move to + * either the DataStream and/or Table API. This class is retained for testing purposes. */ @SuppressWarnings("serial") public class KMeans { +private static final Logger LOGGER = LoggerFactory.getLogger(KMeans.class); + public static void main(String[] args) throws Exception { +LOGGER.warn( +"All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future" ++ " Flink major version. You can still build your application in DataSet, but you should move to" ++ " either the DataStream and/or Table API. This class is retained for testing purposes."); Review Comment: I have added a static String instant DATASET_DEPRECATION_INFO to contain the message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283910488 ## flink-examples/flink-examples-batch/pom.xml: ## @@ -48,25 +48,6 @@ under the License. - - - org.apache.maven.plugins - maven-compiler-plugin - - - compile - process-sources - - compile - - - -Xlint:deprecation - true - - - - - Review Comment: 1. The classes in `flink-examples-batch` module is still utilized for testing purpose. I have added the warning log in all batch example classes to remind the developer that the DataSet API is deprecated. I also have added the note to `bin.xml` in `flink-disk` module to explain the batch examples in example package is only for tests. 2. I have created an issue [FLINK-32741](https://issues.apache.org/jira/browse/FLINK-32741) to track the removal of DataSet related documentations. Currently the issue is assigned to @pegasas . ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -556,7 +556,14 @@ public void setExecutionMode(ExecutionMode executionMode) { * The default execution mode is {@link ExecutionMode#PIPELINED}. * * @return The execution mode for the program. + * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a + * future Flink major version. You can still build your application in DataSet, but you + * should move to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API */ +@Deprecated Review Comment: 1. `ExecutionMode` and `setExecutionMode ` are both deprecated. 2. The class JavaDoc of execution mode is updated. ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -913,6 +920,18 @@ public LinkedHashSet> getRegisteredPojoTypes() { return registeredPojoTypes; } +/** + * Get if the auto type registration is disabled. + * + * @return if the auto type registration is disabled. + * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a + * future Flink major version. You can still build your application in DataSet, but you + * should move to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API + */ +@Deprecated Review Comment: Both `disableAutoTypeRegistration ` and `AUTO_TYPE_REGISTRATION ` are deprecated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzifu666 closed pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly
xuzifu666 closed pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly URL: https://github.com/apache/flink/pull/23134 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xuzifu666 commented on pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly
xuzifu666 commented on PR #23134: URL: https://github.com/apache/flink/pull/23134#issuecomment-1664944270 would open a new pr to fix these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-32748) WriteSinkFunction::cleanFile need close write automaticly
[ https://issues.apache.org/jira/browse/FLINK-32748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xy closed FLINK-32748. -- Resolution: Incomplete > WriteSinkFunction::cleanFile need close write automaticly > - > > Key: FLINK-32748 > URL: https://issues.apache.org/jira/browse/FLINK-32748 > Project: Flink > Issue Type: Bug > Components: API / Core >Reporter: xy >Priority: Major > Labels: pull-request-available > Fix For: 1.9.4 > > > WriteSinkFunction::cleanFile need close write automaticly -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283910551 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -913,6 +920,18 @@ public LinkedHashSet> getRegisteredPojoTypes() { return registeredPojoTypes; } +/** + * Get if the auto type registration is disabled. + * + * @return if the auto type registration is disabled. + * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a + * future Flink major version. You can still build your application in DataSet, but you + * should move to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API + */ +@Deprecated Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #23100: [FLINK-32708][network] Fix the write logic in remote tier of hybrid shuffle
reswqa commented on code in PR #23100: URL: https://github.com/apache/flink/pull/23100#discussion_r1283940833 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { +currentRetryTime++; +throwException(t, "Failed to check the status of segment file."); Review Comment: `throwException` imply that this will always throw exception, but it does not. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { +currentRetryTime++; +throwException(t, "Failed to check the status of segment file."); } return isExist; } +private void throwException(Throwable t, String logMessage) { +LOG.error(logMessage); Review Comment: This can easily confuse people. Some users monitor the system by scanning the logs for ERROR (might sending alerts). IMO, It can only be WARNING at most. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -109,7 +118,11 @@ private FileSystem createFileSystem() { /** Start the executor. */ public void start() { -scannerExecutor.schedule(this, lastInterval, TimeUnit.MILLISECONDS); +synchronized (scannerExecutor) { Review Comment: Do we really need synchronized lock? IIUC, a volatile variable is enough? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -236,15 +254,24 @@ private boolean checkSegmentExist( partitionId, subpartitionId.getSubpartitionId(), segmentId); -boolean isExist; +boolean isExist = false; try { isExist = remoteFileSystem.exists(segmentPath); -} catch (IOException e) { -throw new RuntimeException("Failed to check segment file. " + segmentPath, e); +currentRetryTime = 0; +} catch (Throwable t) { Review Comment: I'm not sure if catch any throwable including un-recoverable error is safe enough. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -144,7 +157,9 @@ public void watchSegment( /** Close the executor. */ public void close() { -scannerExecutor.shutdownNow(); +synchronized (scannerExecutor) { +scannerExecutor.shutdownNow(); Review Comment: I'd suggestion waiting for all pending task become finished when close this scanner. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteStorageScanner.java: ## @@ -207,20 +222,23 @@ private void scanMaxSegmentId( Path segmentFinishDir = getSegmentFinishDirPath( baseRemoteStoragePath, partitionId, subpartitionId.getSubpartitionId()); -FileStatus[] fileStatuses; +FileStatus[] fileStatuses = new FileStatus[0]; try { if (!remoteFileSystem.exists(segmentFinishDir)) { return; } fileStatuses = remoteFileSystem.listStatus(segmentFinishDir); -} catch (IOException e) { -throw new RuntimeException( -"Failed to list the segment finish file. " + segmentFinishDir, e); +currentRetryTime = 0; +} catch (Throwable t) { +if (t instanceof java.io.FileNotFoundException) { +return; +} +currentRetryTime++; +throwException(t, "Failed to list the segment finish file."); } -if
[jira] [Updated] (FLINK-32746) Enable ZGC in JDK17 to solve long time class unloading STW
[ https://issues.apache.org/jira/browse/FLINK-32746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiangyu feng updated FLINK-32746: - Summary: Enable ZGC in JDK17 to solve long time class unloading STW (was: Enable ZGC in JDK17 to solve long time class unloading STW during fullgc) > Enable ZGC in JDK17 to solve long time class unloading STW > -- > > Key: FLINK-32746 > URL: https://issues.apache.org/jira/browse/FLINK-32746 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: xiangyu feng >Priority: Major > > In a OLAP session cluster, a TM need to frequently create new classloaders > and generate new classes. These classes will be accumulated in metaspace. > When metaspace data usage reaches a threshold, a FullGC with a long time > Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and > G1GC are doing Stop-the-World class unloading. Only ZGC supports concurrent > class unload, see more in > [https://bugs.openjdk.org/browse/JDK-8218905|https://bugs.openjdk.org/browse/JDK-8218905).]. > > In our scenario, a class unloading for a 2GB metaspace with 5million classes > will stop the application more than 40 seconds. After switch to ZGC, the > maximum STW of the application has been reduced to less than 10ms. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283910579 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -544,6 +544,13 @@ public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) { * The default execution mode is {@link ExecutionMode#PIPELINED}. * * @param executionMode The execution mode to use. + * @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs. + * All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future + * Flink major version. You can still build your application in DataSet, but you should move + * to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API */ public void setExecutionMode(ExecutionMode executionMode) { Review Comment: Fixed. ## flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java: ## @@ -77,12 +80,23 @@ * Broadcast variables in bulk iterations * Custom Java objects (POJOs) * + * + * Note: All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future + * Flink major version. You can still build your application in DataSet, but you should move to + * either the DataStream and/or Table API. This class is retained for testing purposes. */ @SuppressWarnings("serial") public class KMeans { +private static final Logger LOGGER = LoggerFactory.getLogger(KMeans.class); + public static void main(String[] args) throws Exception { +LOGGER.warn( +"All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future" ++ " Flink major version. You can still build your application in DataSet, but you should move to" ++ " either the DataStream and/or Table API. This class is retained for testing purposes."); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283910526 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -556,7 +556,14 @@ public void setExecutionMode(ExecutionMode executionMode) { * The default execution mode is {@link ExecutionMode#PIPELINED}. * * @return The execution mode for the program. + * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a + * future Flink major version. You can still build your application in DataSet, but you + * should move to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API */ +@Deprecated Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283948678 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java: ## @@ -116,58 +111,53 @@ public static TestingJobResultStore.Builder builder() { /** {@code Builder} for instantiating {@code TestingJobResultStore} instances. */ public static class Builder { -private ThrowingConsumer createDirtyResultConsumer = -ignored -> {}; -private ThrowingConsumer markResultAsCleanConsumer = -ignored -> {}; +private Function> createDirtyResultConsumer = +jobResultEntry -> CompletableFuture.completedFuture(null); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Tartarus0zm commented on pull request #23116: [FLINK-32581][docs] Add docs for atomic CTAS and RTAS
Tartarus0zm commented on PR #23116: URL: https://github.com/apache/flink/pull/23116#issuecomment-1664931262 @luoyuxia thanks for your review. I have updated the pr, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283947288 ## flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/JobResultStoreContractTest.java: ## @@ -46,72 +47,97 @@ public interface JobResultStoreContractTest { @Test default void testStoreJobResultsWithDuplicateIDsThrowsException() throws IOException { JobResultStore jobResultStore = createJobResultStore(); -jobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY); +jobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY).join(); final JobResultEntry otherEntryWithDuplicateId = new JobResultEntry( TestingJobResultStore.createSuccessfulJobResult( DUMMY_JOB_RESULT_ENTRY.getJobId())); -assertThatThrownBy(() -> jobResultStore.createDirtyResult(otherEntryWithDuplicateId)) -.isInstanceOf(IllegalStateException.class); +assertThatThrownBy( +() -> +jobResultStore + .createDirtyResultAsync(otherEntryWithDuplicateId) +.join()) +.isInstanceOf(CompletionException.class); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283946578 ## flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java: ## @@ -116,46 +123,74 @@ public void testBaseDirectoryCreationOnResultStoreInitialization() throws Except assertThat(emptyBaseDirectory).doesNotExist(); fileSystemJobResultStore = -new FileSystemJobResultStore(basePath.getFileSystem(), basePath, false); +new FileSystemJobResultStore( +basePath.getFileSystem(), basePath, false, manuallyTriggeredExecutor); // Result store operations are creating the base directory on-the-fly assertThat(emptyBaseDirectory).doesNotExist(); -fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY); +CompletableFuture dirtyResultAsync = + fileSystemJobResultStore.createDirtyResultAsync(DUMMY_JOB_RESULT_ENTRY); +manuallyTriggeredExecutor.trigger(); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283945744 ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java: ## @@ -579,10 +587,15 @@ public void testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws final CompletableFuture dirtyJobFuture = new CompletableFuture<>(); final JobResultStore jobResultStore = TestingJobResultStore.builder() - .withCreateDirtyResultConsumer(dirtyJobFuture::complete) +.withCreateDirtyResultConsumer( +jobResultEntry -> { +dirtyJobFuture.complete(jobResultEntry); +return FutureUtils.completedVoidFuture(); +}) .withMarkResultAsCleanConsumer( jobId -> { -throw new IOException("Expected IOException."); +return FutureUtils.completedExceptionally( Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750942#comment-17750942 ] Junrui Li commented on FLINK-32739: --- [~SpongebobZ] It will just restart tasks in the original jobs. And more details about failover-strategy can see [here|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#restart-pipelined-region-failover-strategy]. > restarting not work > --- > > Key: FLINK-32739 > URL: https://issues.apache.org/jira/browse/FLINK-32739 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > Attachments: jobmanager.log > > > In my flink standalone cluster, I config 2 taskmanagers. Then I test with > following steps: > # submit streaming job which was configed to fixed restart strategy to flink > session environment > # this job was running on taskmanager1. Then I killed the taskmanager1. > # this job turned to be failed after restarting attemps. > this job could not be transported to taskmanager2 which had enough slots as > expected. > Here's the exception trace: > {code:java} > 2023-08-03 15:13:56 > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, > backoffTimeMS=1) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) > at > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > at > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) > at > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) > at > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at >
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283944785 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java: ## @@ -277,18 +277,11 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) { @GuardedBy("lock") private void verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId) -throws FlinkException { -try { -if (jobResultStore.hasJobResultEntry(getJobID())) { -jobAlreadyDone(leaderSessionId); -} else { -createNewJobMasterServiceProcess(leaderSessionId); -} -} catch (IOException e) { -throw new FlinkException( -String.format( -"Could not retrieve the job scheduling status for job %s.", getJobID()), -e); +throws FlinkException, ExecutionException, InterruptedException { +if (jobResultStore.hasJobResultEntryAsync(getJobID()).get()) { Review Comment: Yes, I agree that make both `startJobMasterServiceProcessAsync` and `verifyJobSchedulingStatusAndCreateJobMasterServiceProcess` leverage the async nature is important. Currently dealing with the sync and async future logic in these methods will be complex and make the code hard to understand, so refactoring this needs to be well designed and may be completed in another pr, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283937477 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java: ## Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283935357 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java: ## @@ -57,17 +61,17 @@ public void markResultAsCleanInternal(JobID jobId) throws IOException, NoSuchEle } @Override -public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException { +public Boolean hasDirtyJobResultEntryInternal(JobID jobId) { return dirtyJobResults.containsKey(jobId); } @Override -public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { +public Boolean hasCleanJobResultEntryInternal(JobID jobId) { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283935250 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedJobResultStore.java: ## @@ -57,17 +61,17 @@ public void markResultAsCleanInternal(JobID jobId) throws IOException, NoSuchEle } @Override -public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException { +public Boolean hasDirtyJobResultEntryInternal(JobID jobId) { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283934746 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java: ## @@ -43,69 +44,70 @@ public interface JobResultStore { * Registers the passed {@link JobResultEntry} instance as {@code dirty} which indicates that * clean-up operations still need to be performed. Once the job resource cleanup has been * finalized, we can mark the {@code JobResultEntry} as {@code clean} result using {@link - * #markResultAsClean(JobID)}. + * #markResultAsCleanAsync(JobID)}. * * @param jobResultEntry The job result we wish to persist. - * @throws IOException if the creation of the dirty result failed for IO reasons. - * @throws IllegalStateException if the passed {@code jobResultEntry} has a {@code JobID} - * attached that is already registered in this {@code JobResultStore}. + * @return a successfully completed future with {@code true} if the dirty result is created + * successfully. The method will throw {@link IllegalStateException} if the passed {@code + * jobResultEntry} has a {@code JobID} attached that is already registered in this {@code + * JobResultStore}. */ -void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, IllegalStateException; +CompletableFuture createDirtyResultAsync(JobResultEntry jobResultEntry); /** * Marks an existing {@link JobResultEntry} as {@code clean}. This indicates that no more * resource cleanup steps need to be performed. No actions should be triggered if the passed * {@code JobID} belongs to a job that was already marked as clean. * * @param jobId Ident of the job we wish to mark as clean. - * @throws IOException if marking the {@code dirty} {@code JobResultEntry} as {@code clean} - * failed for IO reasons. - * @throws NoSuchElementException if there is no corresponding {@code dirty} job present in the - * store for the given {@code JobID}. + * @return a successfully completed future if the result is marked successfully, The future will + * completed with {@link NoSuchElementException} if there is no corresponding {@code dirty} + * job present in the store for the given {@code JobID}. Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32667) Use standalone store and embedding writer for jobs with no-restart-strategy in session cluster
[ https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750938#comment-17750938 ] Fang Yong commented on FLINK-32667: --- [~mapohl] We have an internally e2e test for olap queries to statistical latency and QPS. However, e2e testing requires some resources. I'm considering how to build this test in the community, and whether each important issue can be tested in flink-benchmarks project. What do you think of it? Thanks > Use standalone store and embedding writer for jobs with no-restart-strategy > in session cluster > -- > > Key: FLINK-32667 > URL: https://issues.apache.org/jira/browse/FLINK-32667 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > Labels: pull-request-available > > When a flink session cluster use zk or k8s high availability service, it will > store jobs in zk or ConfigMap. When we submit flink olap jobs to the session > cluster, they always turn off restart strategy. These jobs with > no-restart-strategy should not be stored in zk or ConfigMap in k8s -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32739) restarting not work
[ https://issues.apache.org/jira/browse/FLINK-32739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750937#comment-17750937 ] Spongebob commented on FLINK-32739: --- Thanks [~JunRuiLi] , this option helps. When one of regions restart, it will be restarted in the original job or new job ? > restarting not work > --- > > Key: FLINK-32739 > URL: https://issues.apache.org/jira/browse/FLINK-32739 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.14.5 >Reporter: Spongebob >Priority: Major > Attachments: jobmanager.log > > > In my flink standalone cluster, I config 2 taskmanagers. Then I test with > following steps: > # submit streaming job which was configed to fixed restart strategy to flink > session environment > # this job was running on taskmanager1. Then I killed the taskmanager1. > # this job turned to be failed after restarting attemps. > this job could not be transported to taskmanager2 which had enough slots as > expected. > Here's the exception trace: > {code:java} > 2023-08-03 15:13:56 > org.apache.flink.runtime.JobException: Recovery is suppressed by > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, > backoffTimeMS=1) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1473) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1133) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1073) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:776) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) > at > org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) > at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > at > java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) > at > java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) > at > org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:419) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:411) > at > org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:382) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) > at > org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:506) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1348) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > at >
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283928788 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java: ## @@ -43,69 +44,70 @@ public interface JobResultStore { * Registers the passed {@link JobResultEntry} instance as {@code dirty} which indicates that * clean-up operations still need to be performed. Once the job resource cleanup has been * finalized, we can mark the {@code JobResultEntry} as {@code clean} result using {@link - * #markResultAsClean(JobID)}. + * #markResultAsCleanAsync(JobID)}. * * @param jobResultEntry The job result we wish to persist. - * @throws IOException if the creation of the dirty result failed for IO reasons. - * @throws IllegalStateException if the passed {@code jobResultEntry} has a {@code JobID} - * attached that is already registered in this {@code JobResultStore}. + * @return a successfully completed future with {@code true} if the dirty result is created + * successfully. The method will throw {@link IllegalStateException} if the passed {@code + * jobResultEntry} has a {@code JobID} attached that is already registered in this {@code + * JobResultStore}. */ -void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, IllegalStateException; +CompletableFuture createDirtyResultAsync(JobResultEntry jobResultEntry); /** * Marks an existing {@link JobResultEntry} as {@code clean}. This indicates that no more * resource cleanup steps need to be performed. No actions should be triggered if the passed * {@code JobID} belongs to a job that was already marked as clean. * * @param jobId Ident of the job we wish to mark as clean. - * @throws IOException if marking the {@code dirty} {@code JobResultEntry} as {@code clean} - * failed for IO reasons. - * @throws NoSuchElementException if there is no corresponding {@code dirty} job present in the - * store for the given {@code JobID}. + * @return a successfully completed future if the result is marked successfully, The future will + * completed with {@link NoSuchElementException} if there is no corresponding {@code dirty} + * job present in the store for the given {@code JobID}. */ -void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException; +CompletableFuture markResultAsCleanAsync(JobID jobId); /** - * Returns whether the store already contains an entry for a job. + * Returns the future of whether the store already contains an entry for a job. * * @param jobId Ident of the job we wish to check the store for. - * @return {@code true} if a {@code dirty} or {@code clean} {@link JobResultEntry} exists for - * the given {@code JobID}; otherwise {@code false}. - * @throws IOException if determining whether a job entry is present in the store failed for IO - * reasons. + * @return a successfully completed future with {@code true} if a {@code dirty} or {@code clean} + * {@link JobResultEntry} exists for the given {@code JobID}; otherwise a successfully + * completed future with {@code false}. */ -default boolean hasJobResultEntry(JobID jobId) throws IOException { -return hasDirtyJobResultEntry(jobId) || hasCleanJobResultEntry(jobId); +default CompletableFuture hasJobResultEntryAsync(JobID jobId) { +return hasDirtyJobResultEntryAsync(jobId) +.thenCombine( +hasCleanJobResultEntryAsync(jobId), +(result1, result2) -> result1 || result2); } /** - * Returns whether the store already contains a {@code dirty} entry for the given {@code JobID}. + * Returns the future of whether the store contains a {@code dirty} entry for the given {@code + * JobID}. * * @param jobId Ident of the job we wish to check the store for. - * @return {@code true}, if a {@code dirty} entry exists for the given {@code JobID}; otherwise - * {@code false}. - * @throws IOException if determining whether a job entry is present in the store failed for IO - * reasons. + * @return a successfully completed future with {@code true}, if a {@code dirty} entry exists + * for the given {@code JobID}; otherwise a successfully completed future with {@code Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283927882 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java: ## @@ -43,69 +44,70 @@ public interface JobResultStore { * Registers the passed {@link JobResultEntry} instance as {@code dirty} which indicates that * clean-up operations still need to be performed. Once the job resource cleanup has been * finalized, we can mark the {@code JobResultEntry} as {@code clean} result using {@link - * #markResultAsClean(JobID)}. + * #markResultAsCleanAsync(JobID)}. * * @param jobResultEntry The job result we wish to persist. - * @throws IOException if the creation of the dirty result failed for IO reasons. - * @throws IllegalStateException if the passed {@code jobResultEntry} has a {@code JobID} - * attached that is already registered in this {@code JobResultStore}. + * @return a successfully completed future with {@code true} if the dirty result is created + * successfully. The method will throw {@link IllegalStateException} if the passed {@code + * jobResultEntry} has a {@code JobID} attached that is already registered in this {@code + * JobResultStore}. */ -void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, IllegalStateException; +CompletableFuture createDirtyResultAsync(JobResultEntry jobResultEntry); /** * Marks an existing {@link JobResultEntry} as {@code clean}. This indicates that no more * resource cleanup steps need to be performed. No actions should be triggered if the passed * {@code JobID} belongs to a job that was already marked as clean. * * @param jobId Ident of the job we wish to mark as clean. - * @throws IOException if marking the {@code dirty} {@code JobResultEntry} as {@code clean} - * failed for IO reasons. - * @throws NoSuchElementException if there is no corresponding {@code dirty} job present in the - * store for the given {@code JobID}. + * @return a successfully completed future if the result is marked successfully, The future will + * completed with {@link NoSuchElementException} if there is no corresponding {@code dirty} + * job present in the store for the given {@code JobID}. */ -void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException; +CompletableFuture markResultAsCleanAsync(JobID jobId); /** - * Returns whether the store already contains an entry for a job. + * Returns the future of whether the store already contains an entry for a job. * * @param jobId Ident of the job we wish to check the store for. - * @return {@code true} if a {@code dirty} or {@code clean} {@link JobResultEntry} exists for - * the given {@code JobID}; otherwise {@code false}. - * @throws IOException if determining whether a job entry is present in the store failed for IO - * reasons. + * @return a successfully completed future with {@code true} if a {@code dirty} or {@code clean} + * {@link JobResultEntry} exists for the given {@code JobID}; otherwise a successfully + * completed future with {@code false}. Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283926936 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java: ## @@ -43,69 +44,70 @@ public interface JobResultStore { * Registers the passed {@link JobResultEntry} instance as {@code dirty} which indicates that * clean-up operations still need to be performed. Once the job resource cleanup has been * finalized, we can mark the {@code JobResultEntry} as {@code clean} result using {@link - * #markResultAsClean(JobID)}. + * #markResultAsCleanAsync(JobID)}. * * @param jobResultEntry The job result we wish to persist. - * @throws IOException if the creation of the dirty result failed for IO reasons. - * @throws IllegalStateException if the passed {@code jobResultEntry} has a {@code JobID} - * attached that is already registered in this {@code JobResultStore}. + * @return a successfully completed future with {@code true} if the dirty result is created + * successfully. The method will throw {@link IllegalStateException} if the passed {@code Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283926122 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/JobResultStore.java: ## @@ -43,69 +44,70 @@ public interface JobResultStore { * Registers the passed {@link JobResultEntry} instance as {@code dirty} which indicates that * clean-up operations still need to be performed. Once the job resource cleanup has been * finalized, we can mark the {@code JobResultEntry} as {@code clean} result using {@link - * #markResultAsClean(JobID)}. + * #markResultAsCleanAsync(JobID)}. * * @param jobResultEntry The job result we wish to persist. - * @throws IOException if the creation of the dirty result failed for IO reasons. - * @throws IllegalStateException if the passed {@code jobResultEntry} has a {@code JobID} - * attached that is already registered in this {@code JobResultStore}. + * @return a successfully completed future with {@code true} if the dirty result is created + * successfully. The method will throw {@link IllegalStateException} if the passed {@code + * jobResultEntry} has a {@code JobID} attached that is already registered in this {@code + * JobResultStore}. */ -void createDirtyResult(JobResultEntry jobResultEntry) throws IOException, IllegalStateException; +CompletableFuture createDirtyResultAsync(JobResultEntry jobResultEntry); /** * Marks an existing {@link JobResultEntry} as {@code clean}. This indicates that no more * resource cleanup steps need to be performed. No actions should be triggered if the passed * {@code JobID} belongs to a job that was already marked as clean. * * @param jobId Ident of the job we wish to mark as clean. - * @throws IOException if marking the {@code dirty} {@code JobResultEntry} as {@code clean} - * failed for IO reasons. - * @throws NoSuchElementException if there is no corresponding {@code dirty} job present in the - * store for the given {@code JobID}. + * @return a successfully completed future if the result is marked successfully, The future will + * completed with {@link NoSuchElementException} if there is no corresponding {@code dirty} + * job present in the store for the given {@code JobID}. */ -void markResultAsClean(JobID jobId) throws IOException, NoSuchElementException; +CompletableFuture markResultAsCleanAsync(JobID jobId); /** - * Returns whether the store already contains an entry for a job. + * Returns the future of whether the store already contains an entry for a job. * * @param jobId Ident of the job we wish to check the store for. - * @return {@code true} if a {@code dirty} or {@code clean} {@link JobResultEntry} exists for - * the given {@code JobID}; otherwise {@code false}. - * @throws IOException if determining whether a job entry is present in the store failed for IO - * reasons. + * @return a successfully completed future with {@code true} if a {@code dirty} or {@code clean} + * {@link JobResultEntry} exists for the given {@code JobID}; otherwise a successfully + * completed future with {@code false}. */ -default boolean hasJobResultEntry(JobID jobId) throws IOException { -return hasDirtyJobResultEntry(jobId) || hasCleanJobResultEntry(jobId); +default CompletableFuture hasJobResultEntryAsync(JobID jobId) { +return hasDirtyJobResultEntryAsync(jobId) +.thenCombine( +hasCleanJobResultEntryAsync(jobId), +(result1, result2) -> result1 || result2); } /** - * Returns whether the store already contains a {@code dirty} entry for the given {@code JobID}. + * Returns the future of whether the store contains a {@code dirty} entry for the given {@code + * JobID}. * * @param jobId Ident of the job we wish to check the store for. - * @return {@code true}, if a {@code dirty} entry exists for the given {@code JobID}; otherwise - * {@code false}. - * @throws IOException if determining whether a job entry is present in the store failed for IO - * reasons. + * @return a successfully completed future with {@code true}, if a {@code dirty} entry exists + * for the given {@code JobID}; otherwise a successfully completed future with {@code + * false}. */ -boolean hasDirtyJobResultEntry(JobID jobId) throws IOException; +CompletableFuture hasDirtyJobResultEntryAsync(JobID jobId); /** - * Returns whether the store already contains a {@code clean} entry for the given {@code JobID}. + * Returns the future of whether the store contains a {@code clean} entry for the given {@code + * JobID}. * * @param jobId Ident of the job we
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283925581 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java: ## @@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws IOException, NoSuchEle } @Override -public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException { +public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException { return fileSystem.exists(constructDirtyPath(jobId)); } @Override -public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { +public Boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { return fileSystem.exists(constructCleanPath(jobId)); } @Override public Set getDirtyResultsInternal() throws IOException { createBasePathIfNeeded(); - final FileStatus[] statuses = fileSystem.listStatus(this.basePath); - Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32738) PROTOBUF format supports projection push down
[ https://issues.apache.org/jira/browse/FLINK-32738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750934#comment-17750934 ] dalongliu commented on FLINK-32738: --- [~zhoujira86] Looks good to me if we can support this feature. Kindly remind you should also consider the case of nested fields pushdown. > PROTOBUF format supports projection push down > - > > Key: FLINK-32738 > URL: https://issues.apache.org/jira/browse/FLINK-32738 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: xiaogang zhou >Assignee: xiaogang zhou >Priority: Major > Fix For: 1.19.0 > > > support projection push down for protobuf -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283925443 ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java: ## @@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws IOException, NoSuchEle } @Override -public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException { +public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException { Review Comment: Fixed. ## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java: ## @@ -181,21 +185,19 @@ public void markResultAsCleanInternal(JobID jobId) throws IOException, NoSuchEle } @Override -public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException { +public Boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException { return fileSystem.exists(constructDirtyPath(jobId)); } @Override -public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { +public Boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32746) Enable ZGC in JDK17 to solve long time class unloading STW during fullgc
[ https://issues.apache.org/jira/browse/FLINK-32746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiangyu feng updated FLINK-32746: - Description: In a OLAP session cluster, a TM need to frequently create new classloaders and generate new classes. These classes will be accumulated in metaspace. When metaspace data usage reaches a threshold, a FullGC with a long time Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and G1GC are doing Stop-the-World class unloading. Only ZGC supports concurrent class unload, see more in [https://bugs.openjdk.org/browse/JDK-8218905|https://bugs.openjdk.org/browse/JDK-8218905).]. In our scenario, a class unloading for a 2GB metaspace with 5million classes will stop the application more than 40 seconds. After switch to ZGC, the maximum STW of the application has been reduced to less than 10ms. was: In a OLAP session cluster, a TM need to frequently create new classloaders and generate new classes. These classes will be accumulated in metaspace. When metaspace data usage reaches a threshold, a FullGC with a long time Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and G1GC are doing Stop-the-World class unloading. Only ZGC support concurrent class unload > Enable ZGC in JDK17 to solve long time class unloading STW during fullgc > > > Key: FLINK-32746 > URL: https://issues.apache.org/jira/browse/FLINK-32746 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: xiangyu feng >Priority: Major > > In a OLAP session cluster, a TM need to frequently create new classloaders > and generate new classes. These classes will be accumulated in metaspace. > When metaspace data usage reaches a threshold, a FullGC with a long time > Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and > G1GC are doing Stop-the-World class unloading. Only ZGC supports concurrent > class unload, see more in > [https://bugs.openjdk.org/browse/JDK-8218905|https://bugs.openjdk.org/browse/JDK-8218905).]. > > In our scenario, a class unloading for a 2GB metaspace with 5million classes > will stop the application more than 40 seconds. After switch to ZGC, the > maximum STW of the application has been reduced to less than 10ms. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283100724 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -1390,29 +1392,71 @@ private CompletableFuture registerGloballyTerminatedJobInJobRes "Job %s is in state %s which is not globally terminal.", jobId, terminalJobStatus); - -ioExecutor.execute( -() -> { -try { -if (jobResultStore.hasCleanJobResultEntry(jobId)) { -log.warn( -"Job {} is already marked as clean but clean up was triggered again.", -jobId); -} else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) { -jobResultStore.createDirtyResult( -new JobResultEntry( - JobResult.createFrom(archivedExecutionGraph))); -log.info( -"Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.", -jobId); -} -} catch (IOException e) { -writeFuture.completeExceptionally(e); +CompletableFuture shouldCheckDirtyJobResult = +jobResultStore +.hasCleanJobResultEntryAsync(jobId) +.handleAsync( +(hasCleanJobResultEntry, throwable) -> { +if (throwable != null) { + writeFuture.completeExceptionally(throwable); +return false; +} else { +if (hasCleanJobResultEntry) { +log.warn( +"Job {} is already marked as clean but " ++ "clean up was triggered again.", +jobId); +writeFuture.complete(null); +return false; +} else { +return true; +} +} +}); +shouldCheckDirtyJobResult.whenCompleteAsync( +(shouldCheck, throwable1) -> { +if (throwable1 != null) { +writeFuture.completeExceptionally(throwable1); return; } -writeFuture.complete(null); +if (shouldCheck) { +jobResultStore +.hasDirtyJobResultEntryAsync(jobId) +.whenCompleteAsync( +(hasDirtyJobResultEntry, throwable2) -> { +if (throwable2 != null) { + writeFuture.completeExceptionally(throwable2); +return; +} +if (!hasDirtyJobResultEntry) { +jobResultStore + .createDirtyResultAsync( +new JobResultEntry( + JobResult.createFrom( + archivedExecutionGraph))) +.whenCompleteAsync( +(unused, throwable3) -> { +if (throwable3 != null) { + writeFuture + .completeExceptionally( + throwable3); +return; +} +log.info( + "Job {} has been registered
[GitHub] [flink] WencongLiu commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
WencongLiu commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1283923614 ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java: ## @@ -332,11 +332,16 @@ public void testJobBeingMarkedAsDirtyBeforeCleanup() throws Exception { TestingJobResultStore.builder() .withCreateDirtyResultConsumer( ignoredJobResultEntry -> { +CompletableFuture result = +new CompletableFuture<>(); try { markAsDirtyLatch.await(); } catch (InterruptedException e) { -throw new RuntimeException(e); + result.completeExceptionally( +new RuntimeException(e)); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly
flinkbot commented on PR #23134: URL: https://github.com/apache/flink/pull/23134#issuecomment-1664900125 ## CI report: * 30978f125a7b9d816e5875a4ce28f8b359341115 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23133: [FLINK-32747][table] Suport ddl for catalog which is loaded from CatalogStore
flinkbot commented on PR #23133: URL: https://github.com/apache/flink/pull/23133#issuecomment-1664899703 ## CI report: * 651315783f96f5c01dabba6702dfaf01190f5942 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32748) WriteSinkFunction::cleanFile need close write automaticly
[ https://issues.apache.org/jira/browse/FLINK-32748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xy updated FLINK-32748: --- Component/s: API / Core > WriteSinkFunction::cleanFile need close write automaticly > - > > Key: FLINK-32748 > URL: https://issues.apache.org/jira/browse/FLINK-32748 > Project: Flink > Issue Type: Bug > Components: API / Core >Reporter: xy >Priority: Major > Labels: pull-request-available > Fix For: 1.9.4 > > > WriteSinkFunction::cleanFile need close write automaticly -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32747) Support ddl for catalog from CatalogStore
[ https://issues.apache.org/jira/browse/FLINK-32747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fang Yong reassigned FLINK-32747: - Assignee: Fang Yong > Support ddl for catalog from CatalogStore > - > > Key: FLINK-32747 > URL: https://issues.apache.org/jira/browse/FLINK-32747 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Major > Labels: pull-request-available > > Support ddl for catalog which loaded by CatalogStore -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xuzifu666 commented on pull request #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly
xuzifu666 commented on PR #23134: URL: https://github.com/apache/flink/pull/23134#issuecomment-1664896044 cc @reswqa have a review plz -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32748) WriteSinkFunction::cleanFile need close write automaticly
[ https://issues.apache.org/jira/browse/FLINK-32748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32748: --- Labels: pull-request-available (was: ) > WriteSinkFunction::cleanFile need close write automaticly > - > > Key: FLINK-32748 > URL: https://issues.apache.org/jira/browse/FLINK-32748 > Project: Flink > Issue Type: Bug >Reporter: xy >Priority: Major > Labels: pull-request-available > Fix For: 1.9.4 > > > WriteSinkFunction::cleanFile need close write automaticly -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32746) Enable ZGC in JDK17 to solve long time class unloading STW during fullgc
[ https://issues.apache.org/jira/browse/FLINK-32746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiangyu feng updated FLINK-32746: - Description: In a OLAP session cluster, a TM need to frequently create new classloaders and generate new classes. These classes will be accumulated in metaspace. When metaspace data usage reaches a threshold, a FullGC with a long time Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and G1GC are doing Stop-the-World class unloading. Only ZGC support concurrent class unload > Enable ZGC in JDK17 to solve long time class unloading STW during fullgc > > > Key: FLINK-32746 > URL: https://issues.apache.org/jira/browse/FLINK-32746 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: xiangyu feng >Priority: Major > > In a OLAP session cluster, a TM need to frequently create new classloaders > and generate new classes. These classes will be accumulated in metaspace. > When metaspace data usage reaches a threshold, a FullGC with a long time > Stop-the-World will be triggered. Currently, both SerialGC, ParallelGC and > G1GC are doing Stop-the-World class unloading. Only ZGC support concurrent > class unload > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xuzifu666 opened a new pull request, #23134: [FLINK-32748] WriteSinkFunction::cleanFile need close write automaticly
xuzifu666 opened a new pull request, #23134: URL: https://github.com/apache/flink/pull/23134 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* the pr is to improve file object manage ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32747) Support ddl for catalog from CatalogStore
[ https://issues.apache.org/jira/browse/FLINK-32747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32747: --- Labels: pull-request-available (was: ) > Support ddl for catalog from CatalogStore > - > > Key: FLINK-32747 > URL: https://issues.apache.org/jira/browse/FLINK-32747 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: Fang Yong >Priority: Major > Labels: pull-request-available > > Support ddl for catalog which loaded by CatalogStore -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] FangYongs opened a new pull request, #23133: [FLINK-32747][table] Suport ddl for catalog which is loaded from CatalogStore
FangYongs opened a new pull request, #23133: URL: https://github.com/apache/flink/pull/23133 ## What is the purpose of the change Currently `CatalogManager` can load catalog from `CatalogStore`, but we cannot perform ddl for the loaded catalog. This pr aims to support these operations. ## Brief change log - Add catalog loaded from `CatalogStore` to `catalogs` in `CatalogManager` - Get catalog from method `getCatalog` instead of `catalogs.get` for operations such as ddl and list tables/views ## Verifying this change This change added tests and can be verified as follows: - Added test case `CatalogManagerTest.testExistingCatalogInStore` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no - The serializers: (yes / no / don't know) no - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no - The S3 file system connector: (yes / no / don't know) no ## Documentation - Does this pull request introduce a new feature? (yes / no) no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32748) WriteSinkFunction::cleanFile need close write automaticly
xy created FLINK-32748: -- Summary: WriteSinkFunction::cleanFile need close write automaticly Key: FLINK-32748 URL: https://issues.apache.org/jira/browse/FLINK-32748 Project: Flink Issue Type: Bug Reporter: xy Fix For: 1.9.4 WriteSinkFunction::cleanFile need close write automaticly -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32747) Support ddl for catalog from CatalogStore
Fang Yong created FLINK-32747: - Summary: Support ddl for catalog from CatalogStore Key: FLINK-32747 URL: https://issues.apache.org/jira/browse/FLINK-32747 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.19.0 Reporter: Fang Yong Support ddl for catalog which loaded by CatalogStore -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32741) Remove DataSet related descriptions in doc
[ https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750930#comment-17750930 ] Junyao Huang edited comment on FLINK-32741 at 8/4/23 2:33 AM: -- Thanks! will start immediately. will link PR to this issue. was (Author: pegasas): Thanks! will start immediately. > Remove DataSet related descriptions in doc > -- > > Key: FLINK-32741 > URL: https://issues.apache.org/jira/browse/FLINK-32741 > Project: Flink > Issue Type: Technical Debt > Components: Documentation >Affects Versions: 2.0.0 >Reporter: Wencong Liu >Assignee: Junyao Huang >Priority: Major > Labels: 2.0-related > Fix For: 1.18.0 > > > Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't > recommend developers to use the DataSet, the descriptions of DataSet should > be removed in the doc after [FLINK-32558]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32741) Remove DataSet related descriptions in doc
[ https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750930#comment-17750930 ] Junyao Huang commented on FLINK-32741: -- Thanks! will start immediately. > Remove DataSet related descriptions in doc > -- > > Key: FLINK-32741 > URL: https://issues.apache.org/jira/browse/FLINK-32741 > Project: Flink > Issue Type: Technical Debt > Components: Documentation >Affects Versions: 2.0.0 >Reporter: Wencong Liu >Assignee: Junyao Huang >Priority: Major > Labels: 2.0-related > Fix For: 1.18.0 > > > Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't > recommend developers to use the DataSet, the descriptions of DataSet should > be removed in the doc after [FLINK-32558]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] FangYongs commented on pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation
FangYongs commented on PR #23063: URL: https://github.com/apache/flink/pull/23063#issuecomment-1664886934 Thanks @ferenc-csaky , left some comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on PR #23026: URL: https://github.com/apache/flink/pull/23026#issuecomment-1664886800 > I think being used in test cases probably should not block the deprecation of the classes. In this specific case, I don't it's necessary to move them to another module. There are many similar classes that parses the application arguments into a `GlobalJobParameters`. I suspect these non-DataSet test cases using `ParameterTool` simply because they are imitated from DataSet test cases. Deprecating `ParameterTool` related classes and make other classes that parse the arguments in similar way with `ParameterTool` is a reasonable method. I'll follow the suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283910595 ## flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java: ## @@ -77,12 +80,23 @@ * Broadcast variables in bulk iterations * Custom Java objects (POJOs) * + * + * Note: All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future + * Flink major version. You can still build your application in DataSet, but you should move to + * either the DataStream and/or Table API. This class is retained for testing purposes. */ @SuppressWarnings("serial") public class KMeans { +private static final Logger LOGGER = LoggerFactory.getLogger(KMeans.class); + public static void main(String[] args) throws Exception { +LOGGER.warn( +"All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future" ++ " Flink major version. You can still build your application in DataSet, but you should move to" ++ " either the DataStream and/or Table API. This class is retained for testing purposes."); Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] WencongLiu commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
WencongLiu commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283910488 ## flink-examples/flink-examples-batch/pom.xml: ## @@ -48,25 +48,6 @@ under the License. - - - org.apache.maven.plugins - maven-compiler-plugin - - - compile - process-sources - - compile - - - -Xlint:deprecation - true - - - - - Review Comment: Fixed. ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -556,7 +556,14 @@ public void setExecutionMode(ExecutionMode executionMode) { * The default execution mode is {@link ExecutionMode#PIPELINED}. * * @return The execution mode for the program. + * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a + * future Flink major version. You can still build your application in DataSet, but you + * should move to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API */ +@Deprecated Review Comment: Fixed. ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -913,6 +920,18 @@ public LinkedHashSet> getRegisteredPojoTypes() { return registeredPojoTypes; } +/** + * Get if the auto type registration is disabled. + * + * @return if the auto type registration is disabled. + * @deprecated All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a + * future Flink major version. You can still build your application in DataSet, but you + * should move to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API + */ +@Deprecated Review Comment: Fixed. ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -544,6 +544,13 @@ public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) { * The default execution mode is {@link ExecutionMode#PIPELINED}. * * @param executionMode The execution mode to use. + * @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs. + * All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future + * Flink major version. You can still build your application in DataSet, but you should move + * to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API */ public void setExecutionMode(ExecutionMode executionMode) { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation
FangYongs commented on code in PR #23063: URL: https://github.com/apache/flink/pull/23063#discussion_r1283909486 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStoreFactory.java: ## @@ -20,42 +20,39 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.factories.CatalogStoreFactory; -import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.FactoryHelper; +import java.util.Collections; import java.util.HashSet; import java.util.Set; -import static org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.CHARSET; import static org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.IDENTIFIER; import static org.apache.flink.table.catalog.FileCatalogStoreFactoryOptions.PATH; import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper; -/** CatalogStore factory for {@link FileCatalogStore}. */ +/** Catalog store factory for {@link FileCatalogStore}. */ public class FileCatalogStoreFactory implements CatalogStoreFactory { private String path; -private String charset; - @Override public CatalogStore createCatalogStore() { -return new FileCatalogStore(path, charset); +return new FileCatalogStore(path); } @Override -public void open(Context context) throws CatalogException { -FactoryUtil.FactoryHelper factoryHelper = createCatalogStoreFactoryHelper(this, context); +public void open(Context context) { +FactoryHelper factoryHelper = +createCatalogStoreFactoryHelper(this, context); factoryHelper.validate(); -ReadableConfig options = factoryHelper.getOptions(); +ReadableConfig options = factoryHelper.getOptions(); path = options.get(PATH); -charset = options.get(CHARSET); } @Override -public void close() throws CatalogException {} +public void close() {} Review Comment: Call super.close() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation
FangYongs commented on code in PR #23063: URL: https://github.com/apache/flink/pull/23063#discussion_r1283908619 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java: ## @@ -217,32 +253,10 @@ public Set listCatalogs() throws CatalogException { public boolean contains(String catalogName) throws CatalogException { checkOpenState(); -return listAllCatalogFiles().containsKey(catalogName); -} - -private Map listAllCatalogFiles() throws CatalogException { -Map files = new HashMap<>(); -File directoryFile = new File(catalogStoreDirectory); -if (!directoryFile.isDirectory()) { -throw new CatalogException("File catalog store only support local directory"); -} - -try { -Files.list(directoryFile.toPath()) -.filter(file -> file.getFileName().toString().endsWith(FILE_EXTENSION)) -.filter(Files::isRegularFile) -.forEach( -p -> -files.put( - p.getFileName().toString().replace(FILE_EXTENSION, ""), -p)); -} catch (Throwable t) { -throw new CatalogException("Failed to list file catalog store directory", t); -} -return files; +return listCatalogs().contains(catalogName); Review Comment: Directly check if the catalog file exists here? I think get all catalogs may be not an efficient operation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation
FangYongs commented on code in PR #23063: URL: https://github.com/apache/flink/pull/23063#discussion_r1283907327 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java: ## @@ -201,8 +222,23 @@ public Optional getCatalog(String catalogName) throws Catalog @Override public Set listCatalogs() throws CatalogException { checkOpenState(); +try { +FileStatus[] statusArr = catalogStorePath.getFileSystem().listStatus(catalogStorePath); -return Collections.unmodifiableSet(listAllCatalogFiles().keySet()); +return Arrays.stream(statusArr) +.filter(status -> !status.isDir()) +.map(FileStatus::getPath) +.map(Path::getName) +.map(filename -> filename.replace(FILE_EXTENSION, "")) +.collect(Collectors.toSet()); +} catch (CatalogException e) { +throw e; Review Comment: The same as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation
FangYongs commented on code in PR #23063: URL: https://github.com/apache/flink/pull/23063#discussion_r1283906870 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java: ## @@ -172,22 +185,30 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) @Override public Optional getCatalog(String catalogName) throws CatalogException { checkOpenState(); - -Path path = getCatalogPath(catalogName); +Path catalogPath = getCatalogPath(catalogName); try { -File file = path.toFile(); -if (!file.exists()) { -LOG.warn("Catalog {}'s store file %s does not exist.", catalogName, path); +FileSystem fs = catalogPath.getFileSystem(); + +if (!fs.exists(catalogPath)) { return Optional.empty(); } -String content = FileUtils.readFile(file, charset); -Map options = yaml.load(content); -return Optional.of(CatalogDescriptor.of(catalogName, Configuration.fromMap(options))); -} catch (Throwable t) { + +try (FSDataInputStream is = fs.open(catalogPath)) { +Map configMap = +YAML_MAPPER.readValue(is, new TypeReference>() {}); + +CatalogDescriptor catalog = +CatalogDescriptor.of(catalogName, Configuration.fromMap(configMap)); + +return Optional.of(catalog); +} +} catch (CatalogException e) { +throw e; Review Comment: Same as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation
FangYongs commented on code in PR #23063: URL: https://github.com/apache/flink/pull/23063#discussion_r1283906615 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java: ## @@ -132,31 +153,23 @@ public void storeCatalog(String catalogName, CatalogDescriptor catalog) public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException { checkOpenState(); - -Path path = getCatalogPath(catalogName); +Path catalogPath = getCatalogPath(catalogName); try { -File file = path.toFile(); -if (file.exists()) { -if (!file.isFile()) { -throw new CatalogException( -String.format( -"Catalog %s's store file %s is not a regular file", -catalogName, path.getFileName())); -} -Files.deleteIfExists(path); -} else { -if (!ignoreIfNotExists) { -throw new CatalogException( -String.format( -"Catalog %s's store file %s is not exist", catalogName, path)); -} +FileSystem fs = catalogPath.getFileSystem(); + +if (fs.exists(catalogPath)) { +fs.delete(catalogPath, false); +} else if (!ignoreIfNotExists) { +throw new CatalogException( +String.format( +"Catalog %s's store file %s does not exist.", +catalogName, catalogPath)); } -} catch (Throwable e) { +} catch (CatalogException e) { +throw e; Review Comment: The same as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation
FangYongs commented on code in PR #23063: URL: https://github.com/apache/flink/pull/23063#discussion_r1283906508 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java: ## @@ -97,25 +114,29 @@ public void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException { checkOpenState(); -Path filePath = getCatalogPath(catalogName); +Path catalogPath = getCatalogPath(catalogName); try { -File file = filePath.toFile(); -if (file.exists()) { +FileSystem fs = catalogPath.getFileSystem(); + +if (fs.exists(catalogPath)) { throw new CatalogException( String.format( "Catalog %s's store file %s is already exist.", -catalogName, filePath)); +catalogName, catalogPath)); } -// create a new file -file.createNewFile(); -String yamlString = yaml.dumpAsMap(catalog.getConfiguration().toMap()); -FileUtils.writeFile(file, yamlString, charset); -LOG.info("Catalog {}'s configuration saved to file {}", catalogName, filePath); -} catch (Throwable e) { + +try (FSDataOutputStream os = fs.create(catalogPath, WriteMode.NO_OVERWRITE)) { +YAML_MAPPER.writeValue(os, catalog.getConfiguration().toMap()); +} + +LOG.info("Catalog {}'s configuration saved to file {}", catalogName, catalogPath); +} catch (CatalogException e) { +throw e; Review Comment: The handling here is a bit strange. Under what circumstances will a CatalogException be caught? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] FangYongs commented on a diff in pull request #23063: [FLINK-32660][table] Introduce external FS compatible FileCatalogStore implementation
FangYongs commented on code in PR #23063: URL: https://github.com/apache/flink/pull/23063#discussion_r1283903488 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FileCatalogStore.java: ## @@ -46,42 +49,56 @@ public class FileCatalogStore extends AbstractCatalogStore { private static final Logger LOG = LoggerFactory.getLogger(FileCatalogStore.class); -private static final String FILE_EXTENSION = ".yaml"; +static final String FILE_EXTENSION = ".yaml"; -/** The directory path where catalog configurations will be stored. */ -private final String catalogStoreDirectory; - -/** The character set to use when reading and writing catalog files. */ -private final String charset; +/** The YAML mapper to use when reading and writing catalog files. */ +private static final YAMLMapper YAML_MAPPER = +new YAMLMapper().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER); -/** The YAML parser to use when reading and writing catalog files. */ -private final Yaml yaml = new Yaml(); +/** The directory path where catalog configurations will be stored. */ +private final Path catalogStorePath; /** * Creates a new {@link FileCatalogStore} instance with the specified directory path. * - * @param catalogStoreDirectory the directory path where catalog configurations will be stored + * @param catalogStorePath the directory path where catalog configurations will be stored */ -public FileCatalogStore(String catalogStoreDirectory, String charset) { -this.catalogStoreDirectory = catalogStoreDirectory; -this.charset = charset; +public FileCatalogStore(String catalogStorePath) { +this.catalogStorePath = new Path(catalogStorePath); } /** * Opens the catalog store and initializes the catalog file map. * - * @throws CatalogException if the catalog store directory does not exist or if there is an - * error reading the directory + * @throws CatalogException if the catalog store directory does not exist, not a directory, or + * if there is an error reading the directory */ @Override public void open() throws CatalogException { -super.open(); - try { +FileSystem fs = catalogStorePath.getFileSystem(); +if (!fs.exists(catalogStorePath)) { +throw new CatalogException( +String.format( +"Failed to open catalog store. The catalog store directory %s does not exist.", +catalogStorePath)); +} -} catch (Throwable e) { -throw new CatalogException("Failed to open file catalog store directory", e); +if (!fs.getFileStatus(catalogStorePath).isDir()) { +throw new CatalogException( +String.format( +"Failed to open catalog store. The given catalog store path %s is not a directory.", +catalogStorePath)); +} +} catch (CatalogException e) { +throw e; +} catch (Exception e) { +throw new CatalogException( +String.format( +"Failed to open file catalog store directory %s.", catalogStorePath), +e); } +super.open(); Review Comment: Generally, `super.open()` is called first. Why is it called last here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32746) Enable ZGC in JDK17 to solve long time class unloading STW during fullgc
xiangyu feng created FLINK-32746: Summary: Enable ZGC in JDK17 to solve long time class unloading STW during fullgc Key: FLINK-32746 URL: https://issues.apache.org/jira/browse/FLINK-32746 Project: Flink Issue Type: Sub-task Components: Table SQL / Runtime Reporter: xiangyu feng -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xintongsong commented on a diff in pull request #23026: [FLINK-32558][flink-java] Deprecate all DataSet API
xintongsong commented on code in PR #23026: URL: https://github.com/apache/flink/pull/23026#discussion_r1283896762 ## flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java: ## @@ -77,12 +80,23 @@ * Broadcast variables in bulk iterations * Custom Java objects (POJOs) * + * + * Note: All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future + * Flink major version. You can still build your application in DataSet, but you should move to + * either the DataStream and/or Table API. This class is retained for testing purposes. */ @SuppressWarnings("serial") public class KMeans { +private static final Logger LOGGER = LoggerFactory.getLogger(KMeans.class); + public static void main(String[] args) throws Exception { +LOGGER.warn( +"All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future" ++ " Flink major version. You can still build your application in DataSet, but you should move to" ++ " either the DataStream and/or Table API. This class is retained for testing purposes."); Review Comment: Can we deduplicate the warning message somewhere? ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -544,6 +544,13 @@ public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) { * The default execution mode is {@link ExecutionMode#PIPELINED}. * * @param executionMode The execution mode to use. + * @deprecated The {@link ExecutionMode} is deprecated because it's only used in DataSet APIs. + * All Flink DataSet APIs are deprecated since Flink 1.18 and will be removed in a future + * Flink major version. You can still build your application in DataSet, but you should move + * to either the DataStream and/or Table API. + * @see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741;> + * FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet + * API */ public void setExecutionMode(ExecutionMode executionMode) { Review Comment: Annotation is missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 merged pull request #22992: [hotfix] Fix typo in JobManagerOptions
wanglijie95 merged PR #22992: URL: https://github.com/apache/flink/pull/22992 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32741) Remove DataSet related descriptions in doc
[ https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750921#comment-17750921 ] Xintong Song commented on FLINK-32741: -- Thanks for volunteering, [~pegasas]. You are assigned. Please go ahead. FYI, the DataSet documentation should be removed before we shipping the release 1.18, which means we only have ~2-3 weeks woking on this (including PR review). > Remove DataSet related descriptions in doc > -- > > Key: FLINK-32741 > URL: https://issues.apache.org/jira/browse/FLINK-32741 > Project: Flink > Issue Type: Technical Debt > Components: Documentation >Affects Versions: 2.0.0 >Reporter: Wencong Liu >Assignee: Junyao Huang >Priority: Major > Labels: 2.0-related > Fix For: 1.18.0 > > > Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't > recommend developers to use the DataSet, the descriptions of DataSet should > be removed in the doc after [FLINK-32558]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-32741) Remove DataSet related descriptions in doc
[ https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-32741: Assignee: Junyao Huang > Remove DataSet related descriptions in doc > -- > > Key: FLINK-32741 > URL: https://issues.apache.org/jira/browse/FLINK-32741 > Project: Flink > Issue Type: Technical Debt > Components: Documentation >Affects Versions: 2.0.0 >Reporter: Wencong Liu >Assignee: Junyao Huang >Priority: Major > Labels: 2.0-related > Fix For: 2.0.0 > > > Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't > recommend developers to use the DataSet, the descriptions of DataSet should > be removed in the doc after [FLINK-32558]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32741) Remove DataSet related descriptions in doc
[ https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song updated FLINK-32741: - Fix Version/s: 1.18.0 (was: 2.0.0) > Remove DataSet related descriptions in doc > -- > > Key: FLINK-32741 > URL: https://issues.apache.org/jira/browse/FLINK-32741 > Project: Flink > Issue Type: Technical Debt > Components: Documentation >Affects Versions: 2.0.0 >Reporter: Wencong Liu >Assignee: Junyao Huang >Priority: Major > Labels: 2.0-related > Fix For: 1.18.0 > > > Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't > recommend developers to use the DataSet, the descriptions of DataSet should > be removed in the doc after [FLINK-32558]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32740) Introduce literal null value for flink-conf.yaml
[ https://issues.apache.org/jira/browse/FLINK-32740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750916#comment-17750916 ] Yao Zhang commented on FLINK-32740: --- Hi [~JunRuiLi], Good question. It is also my concern. I searched values in [Configuration|http://https//nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/config/] and there are no valid values named null(but there are some valid values named NONE). Some values may contain null (e.g. /dev/null) but they will not be translated to null. If the user intends to set the value as the string "null", a slight change is needed by adding double quotes (e.g. some.key: "null"). I also investigate the null literals defined in YAML standard: * * ~ * null (case sensitive) Currently we may follow all the standards or only part of them, as the YAML parser is not used. > Introduce literal null value for flink-conf.yaml > > > Key: FLINK-32740 > URL: https://issues.apache.org/jira/browse/FLINK-32740 > Project: Flink > Issue Type: Improvement > Components: API / Core >Affects Versions: 1.15.4 >Reporter: Yao Zhang >Priority: Minor > > Hi community, > Currently in flink-conf.yaml we only consider simplified YAML syntax like key > value pairs. And it might not be the right timing to indroduce YAML parser. > As [FLINK-23620 |https://issues.apache.org/jira/browse/FLINK-23620] has been > stated, there might be some keys that violate the YAML naming conventions. > The current situation is, if we want to unset the value (or set the value as > its default), what we could do is to remove the key value pair completely or > leave the value blank (e.g. `rest.port: `). It might be inconvenient or less > readable for conf file that controlled by other applications. > So is it necessary to add some special literal values that will be translated > to the real null value? For YAML we usually see ~, null or empty value as > null value. If necessary I would like to do this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #23132: [python] Relax `grpcio` requirement to build on M2
flinkbot commented on PR #23132: URL: https://github.com/apache/flink/pull/23132#issuecomment-1664610864 ## CI report: * 6d68021f5f4f27459374090329157bc8d05b9e63 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] deepyaman opened a new pull request, #23132: [python] Relax `grpcio` requirement to build on M2
deepyaman opened a new pull request, #23132: URL: https://github.com/apache/flink/pull/23132 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32729) allow create an initial deployment with suspend state
[ https://issues.apache.org/jira/browse/FLINK-32729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32729: --- Labels: pull-request-available (was: ) > allow create an initial deployment with suspend state > - > > Key: FLINK-32729 > URL: https://issues.apache.org/jira/browse/FLINK-32729 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > > With this feature, Users could create an application in suspend status as a > backup for the other running application to improve the failure recovery time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] bjhaid opened a new pull request, #641: [FLINK-32729] allow starting a flink job in a suspended state.
bjhaid opened a new pull request, #641: URL: https://github.com/apache/flink-kubernetes-operator/pull/641 In our setup we intend migrating flinkDeployments from one cluster to the other, as part of doing this we want to start the flinkDeploments in a suspended state with an initialSavePoint configured, and only configure it to start after we must have stopped flinkDeployment in the source cluster. This patch was agreed upon in this [slack thread](https://apache-flink.slack.com/archives/C03GV7L3G2C/p1690825041612599). ## What is the purpose of the change *(For example: This pull request adds a new feature to periodically create and maintain savepoints through the `FlinkDeployment` custom resource.)* ## Brief change log *(for example:)* - *Periodic savepoint trigger is introduced to the custom resource* - *The operator checks on reconciliation whether the required time has passed* - *The JobManager's dispose savepoint API is used to clean up obsolete savepoints* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / no) - Core observer or reconciler logic that is regularly executed: (yes / no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] netvl commented on pull request #22854: [FLINK-32137] Added filtering of lambdas when building the flame graph
netvl commented on PR #22854: URL: https://github.com/apache/flink/pull/22854#issuecomment-1664427570 Thank you very much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] dannycranmer commented on a diff in pull request #668: [FLINK-32736] Externalized connectors
dannycranmer commented on code in PR #668: URL: https://github.com/apache/flink-web/pull/668#discussion_r1283535822 ## docs/content/posts/2023-08-03-externalized-connectors.md: ## @@ -0,0 +1,77 @@ +--- +title: "Announcing three new Apache Flink connectors, the new connector versioning strategy and externalization" +date: "2023-08-03T12:00:00Z" +authors: +- elphastori: + name: "Elphas Toringepi" + twitter: "elphastori" +aliases: +- /news/2023/08/03/externalized-connectors.html +--- + +## New connectors + +We're excited to announce that Apache Flink now supports three new connectors: [Amazon DynamoDB](https://aws.amazon.com/dynamodb), [MongoDB](https://www.mongodb.com/) and [OpenSearch](https://opensearch.org/)! The connectors are available for both the [DataStream](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/) and [Table/SQL](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/) APIs. + +- **[Amazon DynamoDB](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)** - This connector includes a sink that provides at-least-once delivery guarantees. +- **[MongoDB connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/mongodb/)** - This connector includes a source and sink that provide at-least-once guarantees. +- **[OpenSearch sink](https://github.com/apache/flink-connector-opensearch/blob/main/docs/content/docs/connectors/datastream/opensearch.md)** - This connector includes a sink that provides at-least-once guarantees Review Comment: ```suggestion - **[OpenSearch sink](https://github.com/apache/flink-connector-opensearch/blob/main/docs/content/docs/connectors/datastream/opensearch.md)** - This connector includes a sink that provides at-least-once guarantees. ``` ## docs/content/posts/2023-08-03-externalized-connectors.md: ## @@ -0,0 +1,77 @@ +--- +title: "Announcing three new Apache Flink connectors, the new connector versioning strategy and externalization" +date: "2023-08-03T12:00:00Z" +authors: +- elphastori: + name: "Elphas Toringepi" + twitter: "elphastori" +aliases: +- /news/2023/08/03/externalized-connectors.html +--- + +## New connectors + +We're excited to announce that Apache Flink now supports three new connectors: [Amazon DynamoDB](https://aws.amazon.com/dynamodb), [MongoDB](https://www.mongodb.com/) and [OpenSearch](https://opensearch.org/)! The connectors are available for both the [DataStream](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/) and [Table/SQL](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/) APIs. + +- **[Amazon DynamoDB](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)** - This connector includes a sink that provides at-least-once delivery guarantees. +- **[MongoDB connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/mongodb/)** - This connector includes a source and sink that provide at-least-once guarantees. +- **[OpenSearch sink](https://github.com/apache/flink-connector-opensearch/blob/main/docs/content/docs/connectors/datastream/opensearch.md)** - This connector includes a sink that provides at-least-once guarantees + +|Connector|Date Released|Supported Flink Versions| +|---|---|---| +|Amazon DynamoDB sink|12/2/2022|1.15+| +|MongoDB connector|3/31/2023|1.16+| +|OpenSearch sink|12/21/2022|1.16+| + +## Externalized connectors + +The community have externalized connectors from [Flink’s main repository](https://github.com/apache/flink). This was driven to realise the following benefits: +- **Faster releases of connectors:** New features can be added more quickly, bugs can be fixed immediately, and we can have faster security patches in case of direct or indirect (through dependencies) security flaws. +- **Adding newer connector features to older Flink versions:** By having stable connector APIs, the same connector artifact may be used with different Flink versions. Thus, new features can also immediately be used with older Flink versions. +- **More activity and contributions around connectors:** By easing the contribution and development process around connectors, we will see faster development and also more connectors. +- **Documentation:** Standardized documentation and user experience for the connectors, regardless of where they are maintained. +- **A faster Flink CI:** By not needing to build and test connectors, the Flink CI pipeline will be faster and Flink developers will experience fewer build stabilities (which mostly come from connectors). That should speed up Flink development. + +The following connectors have been moved to individual repositories: + +- [Kafka / Upsert-Kafka](https://github.com/apache/flink-connector-kafka) +-
[jira] [Commented] (FLINK-32741) Remove DataSet related descriptions in doc
[ https://issues.apache.org/jira/browse/FLINK-32741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750841#comment-17750841 ] Junyao Huang commented on FLINK-32741: -- Hi, [~Wencong Liu] , May I work on this issue? > Remove DataSet related descriptions in doc > -- > > Key: FLINK-32741 > URL: https://issues.apache.org/jira/browse/FLINK-32741 > Project: Flink > Issue Type: Technical Debt > Components: Documentation >Affects Versions: 2.0.0 >Reporter: Wencong Liu >Priority: Major > Labels: 2.0-related > Fix For: 2.0.0 > > > Since All DataSet APIs will be deprecated in [FLINK-32558] and we don't > recommend developers to use the DataSet, the descriptions of DataSet should > be removed in the doc after [FLINK-32558]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] elphastori commented on a diff in pull request #668: [FLINK-32736] Externalized connectors
elphastori commented on code in PR #668: URL: https://github.com/apache/flink-web/pull/668#discussion_r1283509741 ## docs/content/posts/2023-08-03-connector-updates.md: ## @@ -0,0 +1,68 @@ +--- +title: "DynamoDB, MongoDB and OpenSearch externalized connectors" +date: "2023-08-03T12:00:00Z" +authors: +- elphastori: + name: "Elphas Toringepi" + twitter: "elphastori" +aliases: +- /news/2023/08/03/externalized-connectors.html +--- + +## New connectors Review Comment: Thanks, I've listed the contributors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23131: Explanation of Asynchronous Problem in Querying Data from HBase
flinkbot commented on PR #23131: URL: https://github.com/apache/flink/pull/23131#issuecomment-1664321928 ## CI report: * 30618a118c1bc19d4fb9b1a6495bd41ac869350f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] GududeDog opened a new pull request, #23131: Explanation of Asynchronous Problem in Querying Data from HBase
GududeDog opened a new pull request, #23131: URL: https://github.com/apache/flink/pull/23131 Explanation of Asynchronous Problem in Querying Data from HBase When sync is not enabled, the HBaseRowDataLookupFunction object will be created. When entering this method, you notice that an HTable table is declared, but it is not immediately assigned. Instead, it is assigned in the open method. In actual development, a connection object is created in the open method, which is secure. If used, it will be closed. If it is a Table object, it is not safe. It is recommended to turn it off after using it. However, in the code, it can be seen that it is created in the open method and closed in the close method. If the data source continues to come, the close method will not execute. If it is a connected object, it is not a problem. If it is a table object, using the same object when processing each piece of data will cause problems. In the lookup method, if you want to query data in HBase, you must first create an object called the table get (get) method. When querying data, you use the same data, which may result in inaccurate data. If it is multit hreaded, it will be unsafe ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hackergin commented on pull request #23109: [FLINK-32475][docs] Add doc for time travel
hackergin commented on PR #23109: URL: https://github.com/apache/flink/pull/23109#issuecomment-1664182437 @luoyuxia please take a look at this pr when you are free, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18356) flink-table-planner Exit code 137 returned from process
[ https://issues.apache.org/jira/browse/FLINK-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750786#comment-17750786 ] Matthias Pohl commented on FLINK-18356: --- [~337361...@qq.com] I guess you can create a PR in https://github.com/zentol/flink-ci-docker. AFAIR, that was the fork for the CI docker image that was recently pushed to the Docker hub (https://hub.docker.com/r/chesnay/flink-ci/tags). > flink-table-planner Exit code 137 returned from process > --- > > Key: FLINK-18356 > URL: https://issues.apache.org/jira/browse/FLINK-18356 > Project: Flink > Issue Type: Bug > Components: Build System / Azure Pipelines, Tests >Affects Versions: 1.12.0, 1.13.0, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0 >Reporter: Piotr Nowojski >Assignee: Yunhong Zheng >Priority: Critical > Labels: pull-request-available, test-stability > Attachments: 1234.jpg, app-profiling_4.gif, > image-2023-01-11-22-21-57-784.png, image-2023-01-11-22-22-32-124.png, > image-2023-02-16-20-18-09-431.png, image-2023-07-11-19-28-52-851.png, > image-2023-07-11-19-35-54-530.png, image-2023-07-11-19-41-18-626.png, > image-2023-07-11-19-41-37-105.png > > > {noformat} > = test session starts > == > platform linux -- Python 3.7.3, pytest-5.4.3, py-1.8.2, pluggy-0.13.1 > cachedir: .tox/py37-cython/.pytest_cache > rootdir: /__w/3/s/flink-python > collected 568 items > pyflink/common/tests/test_configuration.py ..[ > 1%] > pyflink/common/tests/test_execution_config.py ...[ > 5%] > pyflink/dataset/tests/test_execution_environment.py . > ##[error]Exit code 137 returned from process: file name '/bin/docker', > arguments 'exec -i -u 1002 > 97fc4e22522d2ced1f4d23096b8929045d083dd0a99a4233a8b20d0489e9bddb > /__a/externals/node/bin/node /__w/_temp/containerHandlerInvoker.js'. > Finishing: Test - python > {noformat} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=3729=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-web] elphastori commented on a diff in pull request #668: [FLINK-32736] Externalized connectors
elphastori commented on code in PR #668: URL: https://github.com/apache/flink-web/pull/668#discussion_r1283321359 ## docs/content/posts/2023-08-03-connector-updates.md: ## @@ -0,0 +1,68 @@ +--- +title: "DynamoDB, MongoDB and OpenSearch externalized connectors" +date: "2023-08-03T12:00:00Z" +authors: +- elphastori: + name: "Elphas Toringepi" + twitter: "elphastori" +aliases: +- /news/2023/08/03/externalized-connectors.html +--- + +## New connectors + +We're excited to announce that Apache Flink now includes 3 new connectors for [DynamoDB](https://aws.amazon.com/dynamodb), [MongoDB](https://www.mongodb.com/) and [OpenSearch](https://opensearch.org/)! The connectors are available for both the [DataStream](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/) and [Table/SQL](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/) APIs. Review Comment: I'll also change 3 to three. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-web] elphastori commented on a diff in pull request #668: [FLINK-32736] Externalized connectors
elphastori commented on code in PR #668: URL: https://github.com/apache/flink-web/pull/668#discussion_r1283317707 ## docs/content/posts/2023-08-03-connector-updates.md: ## @@ -0,0 +1,68 @@ +--- +title: "DynamoDB, MongoDB and OpenSearch externalized connectors" Review Comment: It sounds better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32745) Add a flag to skip InputSelectable preValidate step
Chesnay Schepler created FLINK-32745: Summary: Add a flag to skip InputSelectable preValidate step Key: FLINK-32745 URL: https://issues.apache.org/jira/browse/FLINK-32745 Project: Flink Issue Type: Improvement Components: API / DataStream, Runtime / Configuration Reporter: Chesnay Schepler Fix For: 1.19.0 {{StreamingJobGraphGenerator#preValidate}} has a step where it checks that no operator implements {{InputSelectable}} if checkpointing is enabled, because these features aren't compatible. This step can be extremely expensive when the {{CodeGenOperatorFactory}} is used, because it requires all generated operator classes to actually be compiled (which usually only happens on the task manager). If you know what jobs you're running this step can be pure overhead. It would be nice if we'd have a flag to skip this validation step. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32744) HiveDialectQueryITCase.testWithOverWindow fails with timeout
[ https://issues.apache.org/jira/browse/FLINK-32744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-32744: -- Priority: Critical (was: Major) > HiveDialectQueryITCase.testWithOverWindow fails with timeout > > > Key: FLINK-32744 > URL: https://issues.apache.org/jira/browse/FLINK-32744 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Tests >Affects Versions: 1.17.1 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51922=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=22287 > {code} > Aug 03 03:36:53 [ERROR] > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testWithOverWindow > Time elapsed: 38.534 s <<< ERROR! > Aug 03 03:36:53 org.apache.flink.table.api.TableException: Failed to execute > sql > Aug 03 03:36:53 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:976) > Aug 03 03:36:53 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1424) > Aug 03 03:36:53 at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765) > Aug 03 03:36:53 at > org.apache.flink.connectors.hive.HiveDialectQueryITCase.testWithOverWindow(HiveDialectQueryITCase.java:690) > Aug 03 03:36:53 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Aug 03 03:36:53 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Aug 03 03:36:53 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Aug 03 03:36:53 at java.lang.reflect.Method.invoke(Method.java:498) > Aug 03 03:36:53 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Aug 03 03:36:53 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Aug 03 03:36:53 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Aug 03 03:36:53 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Aug 03 03:36:53 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Aug 03 03:36:53 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Aug 03 03:36:53 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Aug 03 03:36:53 at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > Aug 03 03:36:53 at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > Aug 03 03:36:53 at org.junit.rules.RunRules.evaluate(RunRules.java:20) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Aug 03 03:36:53 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Aug 03 03:36:53 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > Aug 03 03:36:53 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > Aug 03 03:36:53 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > Aug 03 03:36:53 at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > Aug 03 03:36:53 at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > Aug 03 03:36:53 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) > Aug 03 03:36:53 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) > Aug 03 03:36:53 at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) > Aug 03 03:36:53 at >
[jira] [Created] (FLINK-32744) HiveDialectQueryITCase.testWithOverWindow fails with timeout
Matthias Pohl created FLINK-32744: - Summary: HiveDialectQueryITCase.testWithOverWindow fails with timeout Key: FLINK-32744 URL: https://issues.apache.org/jira/browse/FLINK-32744 Project: Flink Issue Type: Bug Components: Connectors / Hive, Tests Affects Versions: 1.17.1 Reporter: Matthias Pohl https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=51922=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=22287 {code} Aug 03 03:36:53 [ERROR] org.apache.flink.connectors.hive.HiveDialectQueryITCase.testWithOverWindow Time elapsed: 38.534 s <<< ERROR! Aug 03 03:36:53 org.apache.flink.table.api.TableException: Failed to execute sql Aug 03 03:36:53 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:976) Aug 03 03:36:53 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1424) Aug 03 03:36:53 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765) Aug 03 03:36:53 at org.apache.flink.connectors.hive.HiveDialectQueryITCase.testWithOverWindow(HiveDialectQueryITCase.java:690) Aug 03 03:36:53 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Aug 03 03:36:53 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Aug 03 03:36:53 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Aug 03 03:36:53 at java.lang.reflect.Method.invoke(Method.java:498) Aug 03 03:36:53 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Aug 03 03:36:53 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Aug 03 03:36:53 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Aug 03 03:36:53 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Aug 03 03:36:53 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Aug 03 03:36:53 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Aug 03 03:36:53 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Aug 03 03:36:53 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Aug 03 03:36:53 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Aug 03 03:36:53 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Aug 03 03:36:53 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Aug 03 03:36:53 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Aug 03 03:36:53 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Aug 03 03:36:53 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Aug 03 03:36:53 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) Aug 03 03:36:53 at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) Aug 03 03:36:53 at org.junit.rules.RunRules.evaluate(RunRules.java:20) Aug 03 03:36:53 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Aug 03 03:36:53 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Aug 03 03:36:53 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) Aug 03 03:36:53 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) Aug 03 03:36:53 at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) Aug 03 03:36:53 at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) Aug 03 03:36:53 at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) Aug 03 03:36:53 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) Aug 03 03:36:53 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) Aug 03 03:36:53 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) Aug 03 03:36:53 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) Aug 03 03:36:53 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) Aug 03 03:36:53 at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) Aug 03 03:36:53 at