Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-26 Thread via GitHub


fredia merged PR #24667:
URL: https://github.com/apache/flink/pull/24667


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-25 Thread via GitHub


fredia commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2078637067

   @Zakelly Thanks for the review, I addressed the comments and rebased to 
master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-25 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1580464018


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,26 +97,38 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
-
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
-this.stateRequestsBuffer = new StateRequestBuffer<>();
+
 this.inFlightRecordNum = new AtomicInteger(0);
+this.stateRequestsBuffer =
+new StateRequestBuffer<>(
+bufferTimeout,
+(scheduledTriggerSeq) ->
+mailboxExecutor.execute(
+() -> {
+if 
(stateRequestsBuffer.currentTriggerSeq.get()

Review Comment:
   How about providing a function `boolean 
stateRequestsBuffer#checkCurrentSeq(long seq)` ?



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##
@@ -53,17 +73,74 @@ public class StateRequestBuffer {
 /** The number of state requests in blocking queue. */
 int blockingQueueSize;
 
-public StateRequestBuffer() {
+/** The timeout of {@link #activeQueue} triggering in milliseconds. */
+final long bufferTimeout;
+
+/** The handler to trigger when timeout. */
+final Consumer timeoutHandler;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+/**
+ * The current scheduled future, when the next scheduling occurs, the 
previous one that has not
+ * yet been executed will be canceled.
+ */
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current scheduled trigger sequence number, a timeout trigger is 
scheduled only if {@code
+ * scheduledTriggerSeq} is less than {@code currentTriggerSeq}.
+ */
+AtomicLong scheduledTriggerSeq;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by 1.
+ */
+AtomicLong currentTriggerSeq;
+
+public StateRequestBuffer(long bufferTimeout, Consumer 
timeoutHandler) {
 this.activeQueue = new LinkedList<>();
 this.blockingQueue = new HashMap<>();
 this.blockingQueueSize = 0;
+this.bufferTimeout = bufferTimeout;
+this.timeoutHandler = timeoutHandler;
+this.scheduledTriggerSeq = new AtomicLong(-1);
+this.currentTriggerSeq = new AtomicLong(0);
+if (bufferTimeout > 0) {
+this.scheduledExecutor = DELAYER;
+}
+}
+
+void advanceTriggerSeq() {
+currentTriggerSeq.incrementAndGet();
 }
 
 void enqueueToActive(StateRequest request) {
 if (request.getRequestType() == StateRequestType.SYNC_POINT) {
 request.getFuture().complete(null);
 } else {
 activeQueue.add(request);
+if (bufferTimeout > 0 && currentTriggerSeq.get() > 
scheduledTriggerSeq.get()) {
+if (currentScheduledFuture != null
+&& !currentScheduledFuture.isDone()
+&& !currentScheduledFuture.isCancelled()) {
+currentScheduledFuture.cancel(false);
+}
+scheduledTriggerSeq.set(currentTriggerSeq.get());

Review Comment:
   I was thinking add `final long thisScheduledSeq = 
scheduledTriggerSeq.get();` here, and use `thisScheduledSeq == 
currentTriggerSeq.get()` condition at line 136.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##
@@ -53,17 +73,74 @@ public class StateRequestBuffer {
 /** The number of state requests in blocking queue. */
 int blockingQueueSize;
 
-public StateRequestBuffer() {
+/** The timeout of {@link #activeQueue} triggering in milliseconds. */
+final long bufferTimeout;
+
+/** The handler to trigger when timeout. */
+final Consumer 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-25 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1579142292


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,26 +97,39 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
-
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
-this.stateRequestsBuffer = new StateRequestBuffer<>();
+
 this.inFlightRecordNum = new AtomicInteger(0);
+this.stateRequestsBuffer =
+new StateRequestBuffer<>(
+bufferTimeout,
+() ->
+mailboxExecutor.execute(
+() -> {
+if 
(stateRequestsBuffer.currentTriggerSeq.get()
+== 
stateRequestsBuffer.scheduledTriggerSeq

Review Comment:
   This condition is always true.  I suggest you pass the `triggered seq` into 
the handler lambda as parameter.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##
@@ -53,17 +72,75 @@ public class StateRequestBuffer {
 /** The number of state requests in blocking queue. */
 int blockingQueueSize;
 
-public StateRequestBuffer() {
+/** The timeout of {@link #activeQueue} triggering in milliseconds. */
+final long bufferTimeout;
+
+/** The handler to trigger when {@link #activeQueue} size is 1. */
+final Runnable timeoutHandler;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+/**
+ * The current scheduled future, when the next scheduling occurs, the 
previous one that has not
+ * yet been executed will be canceled.
+ */
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current scheduled trigger sequence number, a timeout trigger is 
scheduled only if {@code
+ * scheduledTriggerSeq} is less than {@code currentTriggerSeq}.
+ */
+AtomicLong scheduledTriggerSeq;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by one.
+ */
+AtomicLong currentTriggerSeq;
+
+public StateRequestBuffer(long bufferTimeout, Runnable timeoutHandler) {
 this.activeQueue = new LinkedList<>();
 this.blockingQueue = new HashMap<>();
 this.blockingQueueSize = 0;
+this.bufferTimeout = bufferTimeout;
+this.timeoutHandler = timeoutHandler;
+this.scheduledTriggerSeq = new AtomicLong(-1);
+this.currentTriggerSeq = new AtomicLong(0);
+if (bufferTimeout > 0) {
+this.scheduledExecutor = DELAYER;
+}
+}
+
+void advanceTriggerSeq() {
+currentTriggerSeq.incrementAndGet();
 }
 
 void enqueueToActive(StateRequest request) {
 if (request.getRequestType() == StateRequestType.SYNC_POINT) {
 request.getFuture().complete(null);
 } else {
 activeQueue.add(request);
+// if the active queue size is 1, it means that the current 
request is the oldest one in

Review Comment:
   nit. remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-25 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578997828


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -220,12 +283,17 @@  void insertBlockingBuffer(StateRequest request) {
  * @param force whether to trigger requests in force.
  */
 void triggerIfNeeded(boolean force) {
-// TODO: introduce a timeout mechanism for triggering.
 if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+// if the active queue size is 1, it means that the current 
request is the oldest one in
+// the active queue.
+if (stateRequestsBuffer.activeQueueSize() == 1) {
+scheduleTimeout(currentTriggerSeq.get());
+}

Review Comment:
   Currently, `triggerIfNeeded(false)` is only called in `handleRequest()`, I 
added a `schedulingCount==0` condition to avoid "trigger multiple times for one 
seq" in the future.
   
   And I moved `scheduleTimeout ` into `StateRequestBuffer`, thanks for the 
suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-25 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578997828


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -220,12 +283,17 @@  void insertBlockingBuffer(StateRequest request) {
  * @param force whether to trigger requests in force.
  */
 void triggerIfNeeded(boolean force) {
-// TODO: introduce a timeout mechanism for triggering.
 if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+// if the active queue size is 1, it means that the current 
request is the oldest one in
+// the active queue.
+if (stateRequestsBuffer.activeQueueSize() == 1) {
+scheduleTimeout(currentTriggerSeq.get());
+}

Review Comment:
   Currently, `triggerIfNeeded(false)` is only called in `handleRequest()`, I 
added a `schedulingCount==1` condition to avoid "trigger multiple times for one 
seq" in the future.
   
   And I moved `scheduleTimeout ` into `StateRequestBuffer`, thanks for the 
suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-24 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578825952


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -220,12 +283,17 @@  void insertBlockingBuffer(StateRequest request) {
  * @param force whether to trigger requests in force.
  */
 void triggerIfNeeded(boolean force) {
-// TODO: introduce a timeout mechanism for triggering.
 if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+// if the active queue size is 1, it means that the current 
request is the oldest one in
+// the active queue.
+if (stateRequestsBuffer.activeQueueSize() == 1) {
+scheduleTimeout(currentTriggerSeq.get());
+}

Review Comment:
   I mean, the `triggerIfNeeded(false)` may be triggered somewhere else right? 
And even the `enqueueToActive` has two entry. I'd suggest move the 
`scheduleTimeout` part into the `StateRequestBuffer`, but the handler and seq 
maintaining part should stay in `AEC`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578815645


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -220,12 +283,17 @@  void insertBlockingBuffer(StateRequest request) {
  * @param force whether to trigger requests in force.
  */
 void triggerIfNeeded(boolean force) {
-// TODO: introduce a timeout mechanism for triggering.
 if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+// if the active queue size is 1, it means that the current 
request is the oldest one in
+// the active queue.
+if (stateRequestsBuffer.activeQueueSize() == 1) {
+scheduleTimeout(currentTriggerSeq.get());
+}

Review Comment:
   No,  there will definitely be a trigger between two 
`stateRequestsBuffer.activeQueueSize() == 1`, so when the two 
`stateRequestsBuffer.activeQueueSize() == 1` conditions are met, seq number 
must be different. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-24 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578769079


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -220,12 +283,17 @@  void insertBlockingBuffer(StateRequest request) {
  * @param force whether to trigger requests in force.
  */
 void triggerIfNeeded(boolean force) {
-// TODO: introduce a timeout mechanism for triggering.
 if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+// if the active queue size is 1, it means that the current 
request is the oldest one in
+// the active queue.
+if (stateRequestsBuffer.activeQueueSize() == 1) {
+scheduleTimeout(currentTriggerSeq.get());
+}

Review Comment:
   Ah... I've mispoken...  Is it possible `scheduleTimeout` multiple times for 
one seq?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-24 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1578767927


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +106,80 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by one.
+ */
+AtomicLong currentTriggerSeq;
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.currentTriggerSeq = new AtomicLong(0);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeout > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));
+((ScheduledThreadPoolExecutor) 
this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+// make sure shutdown removes all pending tasks
+((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+
.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
-"Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
+"Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordsNum {}",
 batchSize,
+bufferTimeout,
 maxInFlightRecords);
 }
 
+void scheduleTimeout(long triggerSeq) {
+if (bufferTimeout > 0) {
+if (currentScheduledFuture != null
+&& !currentScheduledFuture.isDone()
+&& !currentScheduledFuture.isCancelled()) {
+currentScheduledFuture.cancel(false);
+}
+currentScheduledFuture =
+(ScheduledFuture)
+scheduledExecutor.schedule(
+() -> {
+if (triggerSeq != 
currentTriggerSeq.get()) {
+// if any new trigger occurs, skip 
this schedule
+return;
+}
+mailboxExecutor.execute(
+() -> triggerIfNeeded(true), 
"AEC-timeout");

Review Comment:
   Well, I mean we keep the `if (triggerSeq != currentTriggerSeq.get()) { 
return; }` before this, only change the mailbox processing part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1577700082


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +106,80 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by one.
+ */
+AtomicLong currentTriggerSeq;
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.currentTriggerSeq = new AtomicLong(0);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeout > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));
+((ScheduledThreadPoolExecutor) 
this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+// make sure shutdown removes all pending tasks
+((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+
.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
-"Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
+"Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordsNum {}",
 batchSize,
+bufferTimeout,
 maxInFlightRecords);
 }
 
+void scheduleTimeout(long triggerSeq) {
+if (bufferTimeout > 0) {
+if (currentScheduledFuture != null
+&& !currentScheduledFuture.isDone()
+&& !currentScheduledFuture.isCancelled()) {
+currentScheduledFuture.cancel(false);
+}
+currentScheduledFuture =
+(ScheduledFuture)
+scheduledExecutor.schedule(
+() -> {
+if (triggerSeq != 
currentTriggerSeq.get()) {
+// if any new trigger occurs, skip 
this schedule
+return;
+}
+mailboxExecutor.execute(
+() -> triggerIfNeeded(true), 
"AEC-timeout");

Review Comment:
   This way may create an extra email and put it in the mailbox. I lean toward 
skipping it directly before submitting it to the mailbox. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1577695901


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -220,12 +283,17 @@  void insertBlockingBuffer(StateRequest request) {
  * @param force whether to trigger requests in force.
  */
 void triggerIfNeeded(boolean force) {
-// TODO: introduce a timeout mechanism for triggering.
 if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+// if the active queue size is 1, it means that the current 
request is the oldest one in
+// the active queue.
+if (stateRequestsBuffer.activeQueueSize() == 1) {
+scheduleTimeout(currentTriggerSeq.get());
+}

Review Comment:
   No, because the seq number will increase by 1 per trigger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-24 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1577589826


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +106,80 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by one.
+ */
+AtomicLong currentTriggerSeq;
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.currentTriggerSeq = new AtomicLong(0);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeout > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));
+((ScheduledThreadPoolExecutor) 
this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+// make sure shutdown removes all pending tasks
+((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+
.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
-"Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
+"Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordsNum {}",
 batchSize,
+bufferTimeout,
 maxInFlightRecords);
 }
 
+void scheduleTimeout(long triggerSeq) {
+if (bufferTimeout > 0) {
+if (currentScheduledFuture != null
+&& !currentScheduledFuture.isDone()
+&& !currentScheduledFuture.isCancelled()) {
+currentScheduledFuture.cancel(false);
+}
+currentScheduledFuture =
+(ScheduledFuture)
+scheduledExecutor.schedule(
+() -> {
+if (triggerSeq != 
currentTriggerSeq.get()) {
+// if any new trigger occurs, skip 
this schedule
+return;
+}
+mailboxExecutor.execute(
+() -> triggerIfNeeded(true), 
"AEC-timeout");

Review Comment:
   how about
   ```
mailboxExecutor.execute(() -> {if (triggerSeq == currentTriggerSeq.get()) 
{triggerIfNeeded(true);}, "AEC-timeout");
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -220,12 +283,17 @@  void insertBlockingBuffer(StateRequest request) {
  * @param force whether to trigger requests in force.
  */
 void triggerIfNeeded(boolean force) {
-// TODO: introduce a timeout mechanism for triggering.
 if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+// if the active queue size is 1, it means that the current 
request is the oldest one in
+// the active queue.
+if (stateRequestsBuffer.activeQueueSize() == 1) {
+scheduleTimeout(currentTriggerSeq.get());
+}

Review Comment:
   Is it possible trigger multiple times for one seq?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-23 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575787928


##
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##
@@ -346,6 +337,63 @@ public void testSyncPoint() {
 recordContext2.release();
 }
 
+@Test
+void testBufferTimeout() throws InterruptedException {
+batchSize = 5;
+timeout = 1000;
+setup();
+Runnable userCode = () -> valueState.asyncValue();
+
+//  basic timeout ---
+for (int i = 0; i < batchSize - 1; i++) {
+String record = String.format("key%d-r%d", i, i);
+String key = String.format("key%d", batchSize + i);
+RecordContext recordContext = aec.buildContext(record, 
key);
+aec.setCurrentContext(recordContext);
+userCode.run();
+}
+assertThat(aec.timeoutFlag.get()).isFalse();
+assertThat(aec.currentScheduledFuture.isDone()).isFalse();
+assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1);
+
assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1);
+assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+
+Thread.sleep(timeout + 100);

Review Comment:
   Thanks for the suggestion, removed  `Thread.sleep` from 
`testBufferTimeout()` and `testBufferTimeoutSkip()`, used 
`ManuallyTriggeredScheduledExecutorService` as `ScheduledExecutor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-23 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575735682


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
 }
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeOut,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeOut = bufferTimeOut;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.timeoutFlag = new AtomicBoolean(false);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeOut > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1,
+new ThreadFactory() {
+@Override
+public Thread newThread(Runnable r) {
+return new Thread(r, "AEC-scheduler");
+}
+});
+this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+// make sure shutdown removes all pending tasks
+
this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
 "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
 batchSize,
 maxInFlightRecords);
 }
 
+void scheduleTimeout() {
+if (bufferTimeOut > 0) {
+if (currentScheduledFuture != null
+&& !currentScheduledFuture.isDone()
+&& !currentScheduledFuture.isCancelled()) {
+currentScheduledFuture.cancel(false);
+}
+currentScheduledFuture =
+(ScheduledFuture)
+scheduledExecutor.schedule(
+() -> {
+timeoutFlag.set(true);
+mailboxExecutor.execute(
+() -> triggerIfNeeded(false), 
"AEC-timeout");

Review Comment:
   Added, `testBufferTimeoutSkip` is introduced to test it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575597004


##
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##
@@ -346,6 +337,63 @@ public void testSyncPoint() {
 recordContext2.release();
 }
 
+@Test
+void testBufferTimeout() throws InterruptedException {
+batchSize = 5;
+timeout = 1000;
+setup();
+Runnable userCode = () -> valueState.asyncValue();
+
+//  basic timeout ---
+for (int i = 0; i < batchSize - 1; i++) {
+String record = String.format("key%d-r%d", i, i);
+String key = String.format("key%d", batchSize + i);
+RecordContext recordContext = aec.buildContext(record, 
key);
+aec.setCurrentContext(recordContext);
+userCode.run();
+}
+assertThat(aec.timeoutFlag.get()).isFalse();
+assertThat(aec.currentScheduledFuture.isDone()).isFalse();
+assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1);
+
assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1);
+assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+
+Thread.sleep(timeout + 100);

Review Comment:
   It might be better to avoid using `Thread.sleep` in test cases, as it may 
increase the duration of CI and behave as flaky tests. How about introduce a 
`TestScheduledThreadPoolExecutor` so we can control when each step is 
triggered? An example of this is 
`org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575609479


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
   Would it be better to reuse existing utility methods like 
`FutureUtils.delay()`? This way AEC won't need to maintain such resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575604054


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
 }
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeOut,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeOut = bufferTimeOut;

Review Comment:
   The "O" in "BufferTimeOut" is upper-case while the "o" in timeoutFlag is 
lower-case. It might be better to get them unified to the same convention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575603596


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
 }
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeOut,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeOut = bufferTimeOut;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.timeoutFlag = new AtomicBoolean(false);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeOut > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1,
+new ThreadFactory() {
+@Override
+public Thread newThread(Runnable r) {
+return new Thread(r, "AEC-scheduler");
+}
+});
+this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+// make sure shutdown removes all pending tasks
+
this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
 "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
 batchSize,
 maxInFlightRecords);
 }
 
+void scheduleTimeout() {
+if (bufferTimeOut > 0) {
+if (currentScheduledFuture != null
+&& !currentScheduledFuture.isDone()
+&& !currentScheduledFuture.isCancelled()) {
+currentScheduledFuture.cancel(false);
+}
+currentScheduledFuture =
+(ScheduledFuture)
+scheduledExecutor.schedule(
+() -> {
+timeoutFlag.set(true);
+mailboxExecutor.execute(
+() -> triggerIfNeeded(false), 
"AEC-timeout");

Review Comment:
   It might be simpler to remove `timeoutFlag` and invoke 
`triggerIfNeeded(true)` directly.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -51,15 +56,24 @@ public class AsyncExecutionController {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
 
-public static final int DEFAULT_BATCH_SIZE = 1000;
-public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+private static final int DEFAULT_BATCH_SIZE = 1000;
+
+private static final int DEFAULT_BUFFER_TIMEOUT = 1000;

Review Comment:
   How about remove the constants and use 
`ASYNC_STATE_BUFFER_SIZE.defaultValue()` from the `ExecutionConfig`? This can 
help avoid maintaining the same value in two places.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575573086


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
    I added a `close()` method, 2 things include:
   1. drain all in-flight records
   2. shutdown `scheduledExecutor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1574559188


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
 }
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeOut,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeOut = bufferTimeOut;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.timeoutFlag = new AtomicBoolean(false);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeOut > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1,
+new ThreadFactory() {
+@Override
+public Thread newThread(Runnable r) {
+return new Thread(r, "AEC-scheduler");
+}
+});
+this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+// make sure shutdown removes all pending tasks
+
this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
 "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
 batchSize,
 maxInFlightRecords);
 }
 
+void scheduleTimeout() {
+if (bufferTimeOut > 0) {
+if (currentScheduledFuture != null
+&& !currentScheduledFuture.isDone()
+&& !currentScheduledFuture.isCancelled()) {
+currentScheduledFuture.cancel(false);
+}
+currentScheduledFuture =
+(ScheduledFuture)
+scheduledExecutor.schedule(
+() -> {
+timeoutFlag.set(true);
+mailboxExecutor.execute(
+() -> triggerIfNeeded(false), 
"AEC-timeout");

Review Comment:
   I'd suggest a `sequence number` for each buffer to make sure the timeout 
trigger the intended one.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+  

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


masteryhx commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1574501889


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
   Should AEC become `Closeable` since we maintain some resources like this 
internally ?



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
 }
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeOut,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeOut = bufferTimeOut;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.timeoutFlag = new AtomicBoolean(false);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeOut > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1,
+new ThreadFactory() {
+@Override
+public Thread newThread(Runnable r) {
+return new Thread(r, "AEC-scheduler");
+}
+});
+this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+// make sure shutdown removes all pending tasks
+
this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
 "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",

Review Comment:
   nit: Also add `bufferTimeOut` here
   BTW, `bufferTimeOut` -> `bufferTimeout` ?



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {

Review Comment:
   Seems it's not used and default value in this class could be removed.



-- 
This is an automated message 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1574293924


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -50,15 +50,24 @@ public class AsyncExecutionController {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
 
-public static final int DEFAULT_BATCH_SIZE = 1000;
-public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+private static final int DEFAULT_BATCH_SIZE = 1000;
+
+private static final int DEFAULT_BUFFER_TIMEOUT = 1000;
+private static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
 
 /**
  * The batch size. When the number of state requests in the active buffer 
exceeds the batch
  * size, a batched state execution would be triggered.
  */
 private final int batchSize;
 
+/**
+ * The timeout of {@link StateRequestBuffer#activeQueue} triggering in 
milliseconds. If the
+ * activeQueue has not reached the {@link #batchSize} within 
'buffer-timeout' milliseconds, a
+ * trigger will perform actively.
+ */
+private final int bufferTimeOut;

Review Comment:
   I added an implementation in second commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-19 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1572344539


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,62 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+// - Async State Execution 
--
+
+/**
+ * The max limit of in-flight records number in async state execution, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+@Documentation.ExcludeFromDocumentation(
+"This is an experimental option, internal use only for now.")
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-state.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async state execution, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async state execution. Async state execution 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_STATE_BUFFER_TIMEOUT} to
+ * control the frequency of triggering.
+ */
+@Experimental
+@Documentation.ExcludeFromDocumentation(
+"This is an experimental option, internal use only for now.")
+public static final ConfigOption ASYNC_STATE_BUFFER_SIZE =
+ConfigOptions.key("execution.async-state.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async state execution. 
Async state execution provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
++ " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
++ " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+/**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+ * #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a 
trigger will perform
+ * actively.
+ */
+@Experimental
+@Documentation.ExcludeFromDocumentation(
+"This is an experimental option, internal use only for now.")
+public static final ConfigOption ASYNC_STATE_BUFFER_TIMEOUT =

Review Comment:
   It might be better to make time configurations as `long` instead of as `int`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -50,15 +50,24 @@ public class AsyncExecutionController {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
 
-public static final int DEFAULT_BATCH_SIZE = 1000;
-public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+private static final int DEFAULT_BATCH_SIZE = 1000;
+
+private static final int DEFAULT_BUFFER_TIMEOUT = 1000;
+private static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
 
 /**
  * The batch size. When the number of state requests in the active buffer 
exceeds the batch
  * size, a batched state execution would be triggered.
  */
 private final int batchSize;
 
+/**
+ * The timeout of {@link StateRequestBuffer#activeQueue} triggering in 
milliseconds. If the
+ * activeQueue has not reached the {@link #batchSize} within 
'buffer-timeout' milliseconds, a
+ * trigger will perform actively.
+ */
+private final int bufferTimeOut;


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-19 Thread via GitHub


fredia commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2066106851

   After offline discussion, FLIP-425 is only used for stateful operators, 
hence ,`async-mode` was changed back to `async-state`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-18 Thread via GitHub


Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1571771428


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"
++ " Note: This is an experimental feature 
under evaluation.");
+
+/**
+ * The max limit of in-flight records number in async execution mode, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+ * the frequency of triggering.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_SIZE =
+ConfigOptions.key("execution.async-mode.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
++ " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
++ " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+/**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+ * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger 
will perform actively.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_TIMEOUT =
+ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Sry guys, I'm afraid I prefer the original proposal of `async-state` in 
FLIP, since this is only for the stateful operator and even the problem of 
`record-order` is caused by state 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569864750


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"
++ " Note: This is an experimental feature 
under evaluation.");
+
+/**
+ * The max limit of in-flight records number in async execution mode, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+ * the frequency of triggering.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_SIZE =
+ConfigOptions.key("execution.async-mode.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
++ " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
++ " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+/**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+ * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger 
will perform actively.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_TIMEOUT =
+ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Good point, changed it to `execution.async-mode.state-buffer-timeout`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569863724


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean 
useSnapshotCompression) {
 configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, 
useSnapshotCompression);
 }
 
+// 

+//  Asynchronous execution configurations
+// 

+
+@Internal

Review Comment:
   Ah, I misread the comment, and changed them from `@Experimental` to 
`@Internal`, I have corrected them now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,19 +103,26 @@ public class AsyncExecutionController {
 final AtomicInteger inFlightRecordNum;
 
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);

Review Comment:
   Given that #24657 has been merged, it might be better to verify that the 
introduced configurations can pass the configured values into AEC through 
operator setups now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,19 +103,26 @@ public class AsyncExecutionController {
 final AtomicInteger inFlightRecordNum;
 
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);

Review Comment:
   Given that #24657 has been merged, it might be better to verify that the 
introduced configurations can pass the configured values into AEC through 
operator setups now.



##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"
++ " Note: This is an experimental feature 
under evaluation.");
+
+/**
+ * The max limit of in-flight records number in async execution mode, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+ * the frequency of triggering.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_SIZE =
+ConfigOptions.key("execution.async-mode.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
+  

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


fredia commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2060999083

   @yunfengzhou-hub @Zakelly Thanks for the detailed review, I have rebased 
this PR and addressed some comments, PTAL if you are free.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1568659638


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"
++ " Note: This is an experimental feature 
under evaluation.");
+
+/**
+ * The max limit of in-flight records number in async execution mode, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+ * the frequency of triggering.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_SIZE =
+ConfigOptions.key("execution.async-mode.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
++ " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
++ " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+/**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+ * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger 
will perform actively.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_TIMEOUT =
+ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Unify those options to `execution.async-mode`, cause we also provide 
`Record-order` mode to preserve the order of records that without state access.



-- 
This is an automated 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1568656329


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"

Review Comment:
   Thanks for your insight, after discussing offline, we think it would be 
better to expose it on the data stream API in some way so that users can enable 
async execution in a more fine-grained manner.
   So, this option is removed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-16 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1566744878


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean 
useSnapshotCompression) {
 configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, 
useSnapshotCompression);
 }
 
+// 

+//  Asynchronous execution configurations
+// 

+
+@Internal

Review Comment:
   It might be better to mark them as `@Experimental` instead of `@Internal`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-15 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1566742123


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"

Review Comment:
   It might be better to remove this configuration for now and add it in future 
when such use cases are found.
   
   - Whether to enable async state access can be inferred automatically by the 
Flink infrastructure, depending on the location of state backends and the state 
API used in operators.
   - It is better not to expose implementation details, like the sync/async 
modes mentioned here, to end users. So long as the order to same-key records 
and the order of async state callbacks are guaranteed, that is enough for users.
   - Especially, the implementation in the sync mode might be altered in 
future, in order to improve performance in situations where sync state API + 
remote state backend is used. In this case the statement "the state access is 
always executed synchronously" might cause deprecation issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-15 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1566744878


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean 
useSnapshotCompression) {
 configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, 
useSnapshotCompression);
 }
 
+// 

+//  Asynchronous execution configurations
+// 

+
+@Internal

Review Comment:
   It might be better to mark them as `@Experimental` instead of `@Internal`.
   
   - Internal: Annotation to mark methods within **stable**, public APIs as an 
internal developer API.
   - Experimental: Classes with this annotation are neither battle-tested **nor 
stable**, and may be changed or removed in future versions.



##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"

Review Comment:
   It might be better to remove this configuration for now and add it in future 
when such use cases are found.
   
   - Whether to enable async state access can be inferred automatically by the 
Flink infrastructure, depending on the location of state backends and the state 
API used in operators.
   - It is better not to expose implementation details, like the sync/async 
modes mentioned here, to end users. So long as the order to same-key records 
and the order of async state callbacks are guaranteed, that is enough for users.
   - Especially, the implementation in the sync mode might be altered in 
future, in order to improve performance in situations where sync state API + 
remote state backend is used. In this case the statement "the state access is 
always executed synchronously" should cause deprecation issues.



##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-15 Thread via GitHub


fredia commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2058174500

   @Zakelly @ljz2051 would you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-15 Thread via GitHub


flinkbot commented on PR #24667:
URL: https://github.com/apache/flink/pull/24667#issuecomment-2056402383

   
   ## CI report:
   
   * aea56668e33e7062e6b18dac4086a2f05dc36fc1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-15 Thread via GitHub


fredia opened a new pull request, #24667:
URL: https://github.com/apache/flink/pull/24667

   
   
   ## What is the purpose of the change
   
   As part of the async execution model of disaggregated state management, this 
PR introduce async execution configurations.
   
   
   ## Brief change log
   
   - Add async execution configurations in `ExecutionOptions`
   - Add related getter/setter in `ExecutionConfig`
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - StreamExecutionEnvironmentTest#testAsyncExecutionConfiguration
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs/ JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org