DanielCarter-stack commented on PR #10430:
URL: https://github.com/apache/seatunnel/pull/10430#issuecomment-3832461773
### Issue 1: PeekBlockingQueue.moveToTail can only move the head element
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java:127-143`
```java
public boolean moveToTail(Long jobId) {
lock.lock();
try {
E element = jobIdMap.get(jobId);
if (element == null) {
return false;
}
E head = queue.peek();
if (head == null || !head.equals(element)) { // ⚠️ Issue
return false;
}
if (!queue.remove(element)) {
return false;
}
queue.put(element);
notEmpty.signalAll();
return true;
} catch (InterruptedException e) {
log.error("Move element to tail failed. {}",
ExceptionUtils.getMessage(e));
Thread.currentThread().interrupt();
return false;
} finally {
lock.unlock();
}
}
```
**Related Context**:
- Caller: `PendingJobScheduleContext.moveHeadToTail` (line 31-33)
- Caller: `WaitReschedulePolicy.onResourcesNotEnough` (line 71)
**Problem Description**:
The method name is `moveToTail` (move any element to tail), but the actual
implementation is:
```java
E head = queue.peek();
if (head == null || !head.equals(element)) {
return false; // If it's not the head of the queue, return false
directly
}
```
This means the method can only move the head element to the tail. Although
current usage scenarios indeed only require moving the head element, the
mismatch between the method name and implementation can easily cause
misunderstanding.
**Potential Risks**:
- Risk 1: If other developers call this method to move non-head elements in
the future, it will fail silently (return false), which may lead to logic errors
- Risk 2: Reduced code readability, maintainers may mistakenly believe any
element can be moved
**Impact Scope**:
- Direct impact: `PeekBlockingQueue.moveToTail`
- Indirect impact: All callers (currently only
`PendingJobScheduleContext.moveHeadToTail`)
- Impact area: Core scheduling module
**Severity**: MINOR
**Improvement Suggestions**:
Solution 1: Rename the method to clarify its purpose
```java
/**
* Move the head element to the tail of the queue if it matches the given
jobId.
* @param jobId the job id of the head element
* @return true if the head element was moved to tail; false otherwise
*/
public boolean moveHeadToTailIfMatch(Long jobId) {
// ... keep existing implementation
}
```
Solution 2: Implement true "move any element to tail" functionality
```java
/**
* Move the element with the given jobId to the tail of the queue.
* @param jobId the job id to move
* @return true if the element was moved; false if not found or already at
tail
*/
public boolean moveToTail(Long jobId) {
lock.lock();
try {
E element = jobIdMap.get(jobId);
if (element == null) {
return false;
}
// Check if already at tail
if (queue.isEmpty()) {
return false;
}
// Remove the element (wherever it is)
if (!queue.remove(element)) {
return false;
}
// Add to tail
queue.put(element);
notEmpty.signalAll();
return true;
} catch (InterruptedException e) {
log.error("Move element to tail failed. {}",
ExceptionUtils.getMessage(e));
Thread.currentThread().interrupt();
return false;
} finally {
lock.unlock();
}
}
```
**Rationale**:
- Solution 1 better fits current usage scenarios with minimal risk
- Solution 2 is more general-purpose but may increase complexity
- Recommendation: Prioritize Solution 1; if moving non-head elements is
needed in the future, adopt Solution 2
---
### Issue 2: WaitReschedulePolicy's rescheduling condition logic has
potential issues
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PendingJobSchedulePolicyFactory.java:54-74`
```java
private static class WaitReschedulePolicy implements
PendingJobSchedulePolicy {
@Override
public void onResourcesNotEnough(PendingJobScheduleContext context)
throws InterruptedException {
WaitRescheduleConfig config =
context.getEngineConfig()
.getScheduleStrategyConfig(
ScheduleStrategy.WAIT_RESCHEDULE,
WaitResch