Re: [PR] [Improve][Zeta] Add pending queue rescheduling for WAIT schedule strategy [seatunnel]

2026-02-01 Thread via GitHub


DanielCarter-stack commented on PR #10430:
URL: https://github.com/apache/seatunnel/pull/10430#issuecomment-3832461773

   
   
   ### Issue 1: PeekBlockingQueue.moveToTail can only move the head element
   
   **Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java:127-143`
   
   ```java
   public boolean moveToTail(Long jobId) {
   lock.lock();
   try {
   E element = jobIdMap.get(jobId);
   if (element == null) {
   return false;
   }
   E head = queue.peek();
   if (head == null || !head.equals(element)) {  // ⚠️ Issue
   return false;
   }
   if (!queue.remove(element)) {
   return false;
   }
   queue.put(element);
   notEmpty.signalAll();
   return true;
   } catch (InterruptedException e) {
   log.error("Move element to tail failed. {}", 
ExceptionUtils.getMessage(e));
   Thread.currentThread().interrupt();
   return false;
   } finally {
   lock.unlock();
   }
   }
   ```
   
   **Related Context**:
   - Caller: `PendingJobScheduleContext.moveHeadToTail` (line 31-33)
   - Caller: `WaitReschedulePolicy.onResourcesNotEnough` (line 71)
   
   **Problem Description**:
   
   The method name is `moveToTail` (move any element to tail), but the actual 
implementation is:
   ```java
   E head = queue.peek();
   if (head == null || !head.equals(element)) {
   return false;  // If it's not the head of the queue, return false 
directly
   }
   ```
   
   This means the method can only move the head element to the tail. Although 
current usage scenarios indeed only require moving the head element, the 
mismatch between the method name and implementation can easily cause 
misunderstanding.
   
   **Potential Risks**:
   - Risk 1: If other developers call this method to move non-head elements in 
the future, it will fail silently (return false), which may lead to logic errors
   - Risk 2: Reduced code readability, maintainers may mistakenly believe any 
element can be moved
   
   **Impact Scope**:
   - Direct impact: `PeekBlockingQueue.moveToTail`
   - Indirect impact: All callers (currently only 
`PendingJobScheduleContext.moveHeadToTail`)
   - Impact area: Core scheduling module
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   Solution 1: Rename the method to clarify its purpose
   ```java
   /**
* Move the head element to the tail of the queue if it matches the given 
jobId.
* @param jobId the job id of the head element
* @return true if the head element was moved to tail; false otherwise
*/
   public boolean moveHeadToTailIfMatch(Long jobId) {
   // ... keep existing implementation
   }
   ```
   
   Solution 2: Implement true "move any element to tail" functionality
   ```java
   /**
* Move the element with the given jobId to the tail of the queue.
* @param jobId the job id to move
* @return true if the element was moved; false if not found or already at 
tail
*/
   public boolean moveToTail(Long jobId) {
   lock.lock();
   try {
   E element = jobIdMap.get(jobId);
   if (element == null) {
   return false;
   }
   
   // Check if already at tail
   if (queue.isEmpty()) {
   return false;
   }
   
   // Remove the element (wherever it is)
   if (!queue.remove(element)) {
   return false;
   }
   
   // Add to tail
   queue.put(element);
   notEmpty.signalAll();
   return true;
   } catch (InterruptedException e) {
   log.error("Move element to tail failed. {}", 
ExceptionUtils.getMessage(e));
   Thread.currentThread().interrupt();
   return false;
   } finally {
   lock.unlock();
   }
   }
   ```
   
   **Rationale**:
   - Solution 1 better fits current usage scenarios with minimal risk
   - Solution 2 is more general-purpose but may increase complexity
   - Recommendation: Prioritize Solution 1; if moving non-head elements is 
needed in the future, adopt Solution 2
   
   ---
   
   ### Issue 2: WaitReschedulePolicy's rescheduling condition logic has 
potential issues
   
   **Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PendingJobSchedulePolicyFactory.java:54-74`
   
   ```java
   private static class WaitReschedulePolicy implements 
PendingJobSchedulePolicy {
   @Override
   public void onResourcesNotEnough(PendingJobScheduleContext context)
   throws InterruptedException {
   WaitRescheduleConfig config =
   context.getEngineConfig()
   .getScheduleStrategyConfig(
   ScheduleStrategy.WAIT_RESCHEDULE, 
WaitResch

Re: [PR] [Improve][Zeta] Add pending queue rescheduling for WAIT schedule strategy [seatunnel]

2026-02-01 Thread via GitHub


corgy-w commented on PR #10430:
URL: https://github.com/apache/seatunnel/pull/10430#issuecomment-3830922999

   > It might be helpful to make `PENDING_JOB_RESCHEDULE_THRESHOLD` 
configurable, allowing users to adjust it per environment while keeping a 
reasonable default.
   
   @dybyte This is not the final version. I still have some things being 
adjusted and will update it later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] [Improve][Zeta] Add pending queue rescheduling for WAIT schedule strategy [seatunnel]

2026-02-01 Thread via GitHub


dybyte commented on PR #10430:
URL: https://github.com/apache/seatunnel/pull/10430#issuecomment-3830883892

   It might be helpful to make `PENDING_JOB_RESCHEDULE_THRESHOLD` configurable, 
allowing users to adjust it per environment while keeping a reasonable default.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]