Re: [PR] [FLINK-34227][runtime] Fixes concurrency issue in JobMaster shutdown [flink]

2024-03-14 Thread via GitHub


XComp commented on code in PR #24489:
URL: https://github.com/apache/flink/pull/24489#discussion_r1524972650


##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##
@@ -2097,6 +2098,87 @@ public void 
testResourceRequirementsAreRequestedFromTheScheduler() throws Except
 }
 }
 
+@Test
+public void 
testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls()
+throws Exception {
+
+// the ResourceManager should count the connect attempts
+final TestingResourceManagerGateway resourceManagerGateway =
+createAndRegisterTestingResourceManagerGateway();
+final AtomicInteger connectCount = new AtomicInteger();
+final OneShotLatch firstRegistrationLatch = new OneShotLatch();
+resourceManagerGateway.setRegisterJobManagerFunction(
+(jobMasterId, resourceID, s, jobID) -> {
+connectCount.incrementAndGet();
+firstRegistrationLatch.trigger();
+
+return CompletableFuture.completedFuture(
+
resourceManagerGateway.getJobMasterRegistrationSuccess());
+});
+
+// the scheduler close logic should be handled outside the main thread
+final OneShotLatch schedulerCloseLatch = new OneShotLatch();
+final CompletableFuture schedulerCloseFuture = new 
CompletableFuture<>();
+final TestingSchedulerNG scheduler =
+TestingSchedulerNG.newBuilder()
+.setCloseAsyncSupplier(
+() -> {
+schedulerCloseLatch.trigger();
+return schedulerCloseFuture;
+})
+.build();
+
+final OneShotLatch closeSlotPoolLatch = new OneShotLatch();
+final JobMaster jobMaster =
+new JobMasterBuilder(jobGraph, rpcService)
+.withHighAvailabilityServices(haServices)
+.withSlotPoolServiceSchedulerFactory(
+DefaultSlotPoolServiceSchedulerFactory.create(
+
TestingSlotPoolServiceBuilder.newBuilder()
+
.setCloseRunnable(closeSlotPoolLatch::awaitQuietly),
+new 
TestingSchedulerNGFactory(scheduler)))
+.createJobMaster();
+jobMaster.start();
+
+notifyResourceManagerLeaderListeners(resourceManagerGateway);
+firstRegistrationLatch.await();
+
+final CompletableFuture jobMasterCloseFuture = 
jobMaster.closeAsync();
+
+// force the scheduler closing to happen outside the main thread

Review Comment:
   you mean that we fix the issue in the scheduler implementations? :thinking: 
The intention here is to check that the `JobMaster` shutdown is independent 
from any scheduler closeAsync future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34227][runtime] Fixes concurrency issue in JobMaster shutdown [flink]

2024-03-14 Thread via GitHub


XComp commented on code in PR #24489:
URL: https://github.com/apache/flink/pull/24489#discussion_r1524962085


##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##
@@ -2097,6 +2098,87 @@ public void 
testResourceRequirementsAreRequestedFromTheScheduler() throws Except
 }
 }
 
+@Test
+public void 
testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls()
+throws Exception {
+
+// the ResourceManager should count the connect attempts
+final TestingResourceManagerGateway resourceManagerGateway =
+createAndRegisterTestingResourceManagerGateway();
+final AtomicInteger connectCount = new AtomicInteger();
+final OneShotLatch firstRegistrationLatch = new OneShotLatch();
+resourceManagerGateway.setRegisterJobManagerFunction(
+(jobMasterId, resourceID, s, jobID) -> {
+connectCount.incrementAndGet();
+firstRegistrationLatch.trigger();
+
+return CompletableFuture.completedFuture(
+
resourceManagerGateway.getJobMasterRegistrationSuccess());
+});
+
+// the scheduler close logic should be handled outside the main thread

Review Comment:
   You're right - the comment is not really helpful because the intention of 
the future is documented further down in the test method where the 
`schedulerCloseFuture` is actually completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34227][runtime] Fixes concurrency issue in JobMaster shutdown [flink]

2024-03-14 Thread via GitHub


zentol commented on code in PR #24489:
URL: https://github.com/apache/flink/pull/24489#discussion_r1524543831


##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##
@@ -2097,6 +2098,87 @@ public void 
testResourceRequirementsAreRequestedFromTheScheduler() throws Except
 }
 }
 
+@Test
+public void 
testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls()
+throws Exception {
+
+// the ResourceManager should count the connect attempts
+final TestingResourceManagerGateway resourceManagerGateway =
+createAndRegisterTestingResourceManagerGateway();
+final AtomicInteger connectCount = new AtomicInteger();
+final OneShotLatch firstRegistrationLatch = new OneShotLatch();
+resourceManagerGateway.setRegisterJobManagerFunction(
+(jobMasterId, resourceID, s, jobID) -> {
+connectCount.incrementAndGet();
+firstRegistrationLatch.trigger();
+
+return CompletableFuture.completedFuture(
+
resourceManagerGateway.getJobMasterRegistrationSuccess());
+});
+
+// the scheduler close logic should be handled outside the main thread

Review Comment:
   I dont know how this comments relates to the code at hand.



##
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##
@@ -2097,6 +2098,87 @@ public void 
testResourceRequirementsAreRequestedFromTheScheduler() throws Except
 }
 }
 
+@Test
+public void 
testJobMasterDoesNotReconnectToResourceManagerEvenIfCleanupStalls()
+throws Exception {
+
+// the ResourceManager should count the connect attempts
+final TestingResourceManagerGateway resourceManagerGateway =
+createAndRegisterTestingResourceManagerGateway();
+final AtomicInteger connectCount = new AtomicInteger();
+final OneShotLatch firstRegistrationLatch = new OneShotLatch();
+resourceManagerGateway.setRegisterJobManagerFunction(
+(jobMasterId, resourceID, s, jobID) -> {
+connectCount.incrementAndGet();
+firstRegistrationLatch.trigger();
+
+return CompletableFuture.completedFuture(
+
resourceManagerGateway.getJobMasterRegistrationSuccess());
+});
+
+// the scheduler close logic should be handled outside the main thread
+final OneShotLatch schedulerCloseLatch = new OneShotLatch();
+final CompletableFuture schedulerCloseFuture = new 
CompletableFuture<>();
+final TestingSchedulerNG scheduler =
+TestingSchedulerNG.newBuilder()
+.setCloseAsyncSupplier(
+() -> {
+schedulerCloseLatch.trigger();
+return schedulerCloseFuture;
+})
+.build();
+
+final OneShotLatch closeSlotPoolLatch = new OneShotLatch();
+final JobMaster jobMaster =
+new JobMasterBuilder(jobGraph, rpcService)
+.withHighAvailabilityServices(haServices)
+.withSlotPoolServiceSchedulerFactory(
+DefaultSlotPoolServiceSchedulerFactory.create(
+
TestingSlotPoolServiceBuilder.newBuilder()
+
.setCloseRunnable(closeSlotPoolLatch::awaitQuietly),
+new 
TestingSchedulerNGFactory(scheduler)))
+.createJobMaster();
+jobMaster.start();
+
+notifyResourceManagerLeaderListeners(resourceManagerGateway);
+firstRegistrationLatch.await();
+
+final CompletableFuture jobMasterCloseFuture = 
jobMaster.closeAsync();
+
+// force the scheduler closing to happen outside the main thread

Review Comment:
   Why not ensure this can't happen at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34227][runtime] Fixes concurrency issue in JobMaster shutdown [flink]

2024-03-13 Thread via GitHub


flinkbot commented on PR #24489:
URL: https://github.com/apache/flink/pull/24489#issuecomment-1994454761

   
   ## CI report:
   
   * 2cc6490c77495bfb3e46c30e1ff10409abacc770 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34227][runtime] Fixes concurrency issue in JobMaster shutdown [flink]

2024-03-13 Thread via GitHub


XComp opened a new pull request, #24489:
URL: https://github.com/apache/flink/pull/24489

   ## What is the purpose of the change
   
   tba
   
   ## Brief change log
   
   tba
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org