Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia merged PR #24667: URL: https://github.com/apache/flink/pull/24667 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on PR #24667: URL: https://github.com/apache/flink/pull/24667#issuecomment-2078637067 @Zakelly Thanks for the review, I addressed the comments and rebased to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1580464018 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,26 +97,38 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} - public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; -this.stateRequestsBuffer = new StateRequestBuffer<>(); + this.inFlightRecordNum = new AtomicInteger(0); +this.stateRequestsBuffer = +new StateRequestBuffer<>( +bufferTimeout, +(scheduledTriggerSeq) -> +mailboxExecutor.execute( +() -> { +if (stateRequestsBuffer.currentTriggerSeq.get() Review Comment: How about providing a function `boolean stateRequestsBuffer#checkCurrentSeq(long seq)` ? ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java: ## @@ -53,17 +73,74 @@ public class StateRequestBuffer { /** The number of state requests in blocking queue. */ int blockingQueueSize; -public StateRequestBuffer() { +/** The timeout of {@link #activeQueue} triggering in milliseconds. */ +final long bufferTimeout; + +/** The handler to trigger when timeout. */ +final Consumer timeoutHandler; + +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +/** + * The current scheduled future, when the next scheduling occurs, the previous one that has not + * yet been executed will be canceled. + */ +ScheduledFuture currentScheduledFuture; + +/** + * The current scheduled trigger sequence number, a timeout trigger is scheduled only if {@code + * scheduledTriggerSeq} is less than {@code currentTriggerSeq}. + */ +AtomicLong scheduledTriggerSeq; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by 1. + */ +AtomicLong currentTriggerSeq; + +public StateRequestBuffer(long bufferTimeout, Consumer timeoutHandler) { this.activeQueue = new LinkedList<>(); this.blockingQueue = new HashMap<>(); this.blockingQueueSize = 0; +this.bufferTimeout = bufferTimeout; +this.timeoutHandler = timeoutHandler; +this.scheduledTriggerSeq = new AtomicLong(-1); +this.currentTriggerSeq = new AtomicLong(0); +if (bufferTimeout > 0) { +this.scheduledExecutor = DELAYER; +} +} + +void advanceTriggerSeq() { +currentTriggerSeq.incrementAndGet(); } void enqueueToActive(StateRequest request) { if (request.getRequestType() == StateRequestType.SYNC_POINT) { request.getFuture().complete(null); } else { activeQueue.add(request); +if (bufferTimeout > 0 && currentTriggerSeq.get() > scheduledTriggerSeq.get()) { +if (currentScheduledFuture != null +&& !currentScheduledFuture.isDone() +&& !currentScheduledFuture.isCancelled()) { +currentScheduledFuture.cancel(false); +} +scheduledTriggerSeq.set(currentTriggerSeq.get()); Review Comment: I was thinking add `final long thisScheduledSeq = scheduledTriggerSeq.get();` here, and use `thisScheduledSeq == currentTriggerSeq.get()` condition at line 136. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java: ## @@ -53,17 +73,74 @@ public class StateRequestBuffer { /** The number of state requests in blocking queue. */ int blockingQueueSize; -public StateRequestBuffer() { +/** The timeout of {@link #activeQueue} triggering in milliseconds. */ +final long bufferTimeout; + +/** The handler to trigger when timeout. */ +final Consumer
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1579142292 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,26 +97,39 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} - public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; -this.stateRequestsBuffer = new StateRequestBuffer<>(); + this.inFlightRecordNum = new AtomicInteger(0); +this.stateRequestsBuffer = +new StateRequestBuffer<>( +bufferTimeout, +() -> +mailboxExecutor.execute( +() -> { +if (stateRequestsBuffer.currentTriggerSeq.get() +== stateRequestsBuffer.scheduledTriggerSeq Review Comment: This condition is always true. I suggest you pass the `triggered seq` into the handler lambda as parameter. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java: ## @@ -53,17 +72,75 @@ public class StateRequestBuffer { /** The number of state requests in blocking queue. */ int blockingQueueSize; -public StateRequestBuffer() { +/** The timeout of {@link #activeQueue} triggering in milliseconds. */ +final long bufferTimeout; + +/** The handler to trigger when {@link #activeQueue} size is 1. */ +final Runnable timeoutHandler; + +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +/** + * The current scheduled future, when the next scheduling occurs, the previous one that has not + * yet been executed will be canceled. + */ +ScheduledFuture currentScheduledFuture; + +/** + * The current scheduled trigger sequence number, a timeout trigger is scheduled only if {@code + * scheduledTriggerSeq} is less than {@code currentTriggerSeq}. + */ +AtomicLong scheduledTriggerSeq; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ +AtomicLong currentTriggerSeq; + +public StateRequestBuffer(long bufferTimeout, Runnable timeoutHandler) { this.activeQueue = new LinkedList<>(); this.blockingQueue = new HashMap<>(); this.blockingQueueSize = 0; +this.bufferTimeout = bufferTimeout; +this.timeoutHandler = timeoutHandler; +this.scheduledTriggerSeq = new AtomicLong(-1); +this.currentTriggerSeq = new AtomicLong(0); +if (bufferTimeout > 0) { +this.scheduledExecutor = DELAYER; +} +} + +void advanceTriggerSeq() { +currentTriggerSeq.incrementAndGet(); } void enqueueToActive(StateRequest request) { if (request.getRequestType() == StateRequestType.SYNC_POINT) { request.getFuture().complete(null); } else { activeQueue.add(request); +// if the active queue size is 1, it means that the current request is the oldest one in Review Comment: nit. remove this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1578997828 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -220,12 +283,17 @@ void insertBlockingBuffer(StateRequest request) { * @param force whether to trigger requests in force. */ void triggerIfNeeded(boolean force) { -// TODO: introduce a timeout mechanism for triggering. if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) { +// if the active queue size is 1, it means that the current request is the oldest one in +// the active queue. +if (stateRequestsBuffer.activeQueueSize() == 1) { +scheduleTimeout(currentTriggerSeq.get()); +} Review Comment: Currently, `triggerIfNeeded(false)` is only called in `handleRequest()`, I added a `schedulingCount==0` condition to avoid "trigger multiple times for one seq" in the future. And I moved `scheduleTimeout ` into `StateRequestBuffer`, thanks for the suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1578997828 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -220,12 +283,17 @@ void insertBlockingBuffer(StateRequest request) { * @param force whether to trigger requests in force. */ void triggerIfNeeded(boolean force) { -// TODO: introduce a timeout mechanism for triggering. if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) { +// if the active queue size is 1, it means that the current request is the oldest one in +// the active queue. +if (stateRequestsBuffer.activeQueueSize() == 1) { +scheduleTimeout(currentTriggerSeq.get()); +} Review Comment: Currently, `triggerIfNeeded(false)` is only called in `handleRequest()`, I added a `schedulingCount==1` condition to avoid "trigger multiple times for one seq" in the future. And I moved `scheduleTimeout ` into `StateRequestBuffer`, thanks for the suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1578825952 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -220,12 +283,17 @@ void insertBlockingBuffer(StateRequest request) { * @param force whether to trigger requests in force. */ void triggerIfNeeded(boolean force) { -// TODO: introduce a timeout mechanism for triggering. if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) { +// if the active queue size is 1, it means that the current request is the oldest one in +// the active queue. +if (stateRequestsBuffer.activeQueueSize() == 1) { +scheduleTimeout(currentTriggerSeq.get()); +} Review Comment: I mean, the `triggerIfNeeded(false)` may be triggered somewhere else right? And even the `enqueueToActive` has two entry. I'd suggest move the `scheduleTimeout` part into the `StateRequestBuffer`, but the handler and seq maintaining part should stay in `AEC` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1578815645 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -220,12 +283,17 @@ void insertBlockingBuffer(StateRequest request) { * @param force whether to trigger requests in force. */ void triggerIfNeeded(boolean force) { -// TODO: introduce a timeout mechanism for triggering. if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) { +// if the active queue size is 1, it means that the current request is the oldest one in +// the active queue. +if (stateRequestsBuffer.activeQueueSize() == 1) { +scheduleTimeout(currentTriggerSeq.get()); +} Review Comment: No, there will definitely be a trigger between two `stateRequestsBuffer.activeQueueSize() == 1`, so when the two `stateRequestsBuffer.activeQueueSize() == 1` conditions are met, seq number must be different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1578769079 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -220,12 +283,17 @@ void insertBlockingBuffer(StateRequest request) { * @param force whether to trigger requests in force. */ void triggerIfNeeded(boolean force) { -// TODO: introduce a timeout mechanism for triggering. if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) { +// if the active queue size is 1, it means that the current request is the oldest one in +// the active queue. +if (stateRequestsBuffer.activeQueueSize() == 1) { +scheduleTimeout(currentTriggerSeq.get()); +} Review Comment: Ah... I've mispoken... Is it possible `scheduleTimeout` multiple times for one seq? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1578767927 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +106,80 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ +AtomicLong currentTriggerSeq; public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.currentTriggerSeq = new AtomicLong(0); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeout > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, new ExecutorThreadFactory("AEC-timeout-scheduler")); +((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true); +// make sure shutdown removes all pending tasks +((ScheduledThreadPoolExecutor) this.scheduledExecutor) + .setContinueExistingPeriodicTasksAfterShutdownPolicy(false); +((ScheduledThreadPoolExecutor) this.scheduledExecutor) +.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); +} else { +this.scheduledExecutor = null; +} + LOG.info( -"Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", +"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordsNum {}", batchSize, +bufferTimeout, maxInFlightRecords); } +void scheduleTimeout(long triggerSeq) { +if (bufferTimeout > 0) { +if (currentScheduledFuture != null +&& !currentScheduledFuture.isDone() +&& !currentScheduledFuture.isCancelled()) { +currentScheduledFuture.cancel(false); +} +currentScheduledFuture = +(ScheduledFuture) +scheduledExecutor.schedule( +() -> { +if (triggerSeq != currentTriggerSeq.get()) { +// if any new trigger occurs, skip this schedule +return; +} +mailboxExecutor.execute( +() -> triggerIfNeeded(true), "AEC-timeout"); Review Comment: Well, I mean we keep the `if (triggerSeq != currentTriggerSeq.get()) { return; }` before this, only change the mailbox processing part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1577700082 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +106,80 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ +AtomicLong currentTriggerSeq; public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.currentTriggerSeq = new AtomicLong(0); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeout > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, new ExecutorThreadFactory("AEC-timeout-scheduler")); +((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true); +// make sure shutdown removes all pending tasks +((ScheduledThreadPoolExecutor) this.scheduledExecutor) + .setContinueExistingPeriodicTasksAfterShutdownPolicy(false); +((ScheduledThreadPoolExecutor) this.scheduledExecutor) +.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); +} else { +this.scheduledExecutor = null; +} + LOG.info( -"Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", +"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordsNum {}", batchSize, +bufferTimeout, maxInFlightRecords); } +void scheduleTimeout(long triggerSeq) { +if (bufferTimeout > 0) { +if (currentScheduledFuture != null +&& !currentScheduledFuture.isDone() +&& !currentScheduledFuture.isCancelled()) { +currentScheduledFuture.cancel(false); +} +currentScheduledFuture = +(ScheduledFuture) +scheduledExecutor.schedule( +() -> { +if (triggerSeq != currentTriggerSeq.get()) { +// if any new trigger occurs, skip this schedule +return; +} +mailboxExecutor.execute( +() -> triggerIfNeeded(true), "AEC-timeout"); Review Comment: This way may create an extra email and put it in the mailbox. I lean toward skipping it directly before submitting it to the mailbox. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1577695901 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -220,12 +283,17 @@ void insertBlockingBuffer(StateRequest request) { * @param force whether to trigger requests in force. */ void triggerIfNeeded(boolean force) { -// TODO: introduce a timeout mechanism for triggering. if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) { +// if the active queue size is 1, it means that the current request is the oldest one in +// the active queue. +if (stateRequestsBuffer.activeQueueSize() == 1) { +scheduleTimeout(currentTriggerSeq.get()); +} Review Comment: No, because the seq number will increase by 1 per trigger. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1577589826 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +106,80 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ +AtomicLong currentTriggerSeq; public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.currentTriggerSeq = new AtomicLong(0); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeout > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, new ExecutorThreadFactory("AEC-timeout-scheduler")); +((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true); +// make sure shutdown removes all pending tasks +((ScheduledThreadPoolExecutor) this.scheduledExecutor) + .setContinueExistingPeriodicTasksAfterShutdownPolicy(false); +((ScheduledThreadPoolExecutor) this.scheduledExecutor) +.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); +} else { +this.scheduledExecutor = null; +} + LOG.info( -"Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", +"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordsNum {}", batchSize, +bufferTimeout, maxInFlightRecords); } +void scheduleTimeout(long triggerSeq) { +if (bufferTimeout > 0) { +if (currentScheduledFuture != null +&& !currentScheduledFuture.isDone() +&& !currentScheduledFuture.isCancelled()) { +currentScheduledFuture.cancel(false); +} +currentScheduledFuture = +(ScheduledFuture) +scheduledExecutor.schedule( +() -> { +if (triggerSeq != currentTriggerSeq.get()) { +// if any new trigger occurs, skip this schedule +return; +} +mailboxExecutor.execute( +() -> triggerIfNeeded(true), "AEC-timeout"); Review Comment: how about ``` mailboxExecutor.execute(() -> {if (triggerSeq == currentTriggerSeq.get()) {triggerIfNeeded(true);}, "AEC-timeout"); ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -220,12 +283,17 @@ void insertBlockingBuffer(StateRequest request) { * @param force whether to trigger requests in force. */ void triggerIfNeeded(boolean force) { -// TODO: introduce a timeout mechanism for triggering. if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) { +// if the active queue size is 1, it means that the current request is the oldest one in +// the active queue. +if (stateRequestsBuffer.activeQueueSize() == 1) { +scheduleTimeout(currentTriggerSeq.get()); +} Review Comment: Is it possible trigger multiple times for one seq? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1575787928 ## flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java: ## @@ -346,6 +337,63 @@ public void testSyncPoint() { recordContext2.release(); } +@Test +void testBufferTimeout() throws InterruptedException { +batchSize = 5; +timeout = 1000; +setup(); +Runnable userCode = () -> valueState.asyncValue(); + +// basic timeout --- +for (int i = 0; i < batchSize - 1; i++) { +String record = String.format("key%d-r%d", i, i); +String key = String.format("key%d", batchSize + i); +RecordContext recordContext = aec.buildContext(record, key); +aec.setCurrentContext(recordContext); +userCode.run(); +} +assertThat(aec.timeoutFlag.get()).isFalse(); +assertThat(aec.currentScheduledFuture.isDone()).isFalse(); +assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1); +assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); + +Thread.sleep(timeout + 100); Review Comment: Thanks for the suggestion, removed `Thread.sleep` from `testBufferTimeout()` and `testBufferTimeoutSkip()`, used `ManuallyTriggeredScheduledExecutorService` as `ScheduledExecutor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1575735682 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); } public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeOut, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeOut = bufferTimeOut; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.timeoutFlag = new AtomicBoolean(false); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeOut > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, +new ThreadFactory() { +@Override +public Thread newThread(Runnable r) { +return new Thread(r, "AEC-scheduler"); +} +}); +this.scheduledExecutor.setRemoveOnCancelPolicy(true); + +// make sure shutdown removes all pending tasks + this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); +} else { +this.scheduledExecutor = null; +} + LOG.info( "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", batchSize, maxInFlightRecords); } +void scheduleTimeout() { +if (bufferTimeOut > 0) { +if (currentScheduledFuture != null +&& !currentScheduledFuture.isDone() +&& !currentScheduledFuture.isCancelled()) { +currentScheduledFuture.cancel(false); +} +currentScheduledFuture = +(ScheduledFuture) +scheduledExecutor.schedule( +() -> { +timeoutFlag.set(true); +mailboxExecutor.execute( +() -> triggerIfNeeded(false), "AEC-timeout"); Review Comment: Added, `testBufferTimeoutSkip` is introduced to test it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1575597004 ## flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java: ## @@ -346,6 +337,63 @@ public void testSyncPoint() { recordContext2.release(); } +@Test +void testBufferTimeout() throws InterruptedException { +batchSize = 5; +timeout = 1000; +setup(); +Runnable userCode = () -> valueState.asyncValue(); + +// basic timeout --- +for (int i = 0; i < batchSize - 1; i++) { +String record = String.format("key%d-r%d", i, i); +String key = String.format("key%d", batchSize + i); +RecordContext recordContext = aec.buildContext(record, key); +aec.setCurrentContext(recordContext); +userCode.run(); +} +assertThat(aec.timeoutFlag.get()).isFalse(); +assertThat(aec.currentScheduledFuture.isDone()).isFalse(); +assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1); +assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); + +Thread.sleep(timeout + 100); Review Comment: It might be better to avoid using `Thread.sleep` in test cases, as it may increase the duration of CI and behave as flaky tests. How about introduce a `TestScheduledThreadPoolExecutor` so we can control when each step is triggered? An example of this is `org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1575609479 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; Review Comment: Would it be better to reuse existing utility methods like `FutureUtils.delay()`? This way AEC won't need to maintain such resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1575604054 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); } public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeOut, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeOut = bufferTimeOut; Review Comment: The "O" in "BufferTimeOut" is upper-case while the "o" in timeoutFlag is lower-case. It might be better to get them unified to the same convention. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1575603596 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); } public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeOut, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeOut = bufferTimeOut; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.timeoutFlag = new AtomicBoolean(false); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeOut > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, +new ThreadFactory() { +@Override +public Thread newThread(Runnable r) { +return new Thread(r, "AEC-scheduler"); +} +}); +this.scheduledExecutor.setRemoveOnCancelPolicy(true); + +// make sure shutdown removes all pending tasks + this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); +} else { +this.scheduledExecutor = null; +} + LOG.info( "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", batchSize, maxInFlightRecords); } +void scheduleTimeout() { +if (bufferTimeOut > 0) { +if (currentScheduledFuture != null +&& !currentScheduledFuture.isDone() +&& !currentScheduledFuture.isCancelled()) { +currentScheduledFuture.cancel(false); +} +currentScheduledFuture = +(ScheduledFuture) +scheduledExecutor.schedule( +() -> { +timeoutFlag.set(true); +mailboxExecutor.execute( +() -> triggerIfNeeded(false), "AEC-timeout"); Review Comment: It might be simpler to remove `timeoutFlag` and invoke `triggerIfNeeded(true)` directly. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -51,15 +56,24 @@ public class AsyncExecutionController { private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); -public static final int DEFAULT_BATCH_SIZE = 1000; -public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; +private static final int DEFAULT_BATCH_SIZE = 1000; + +private static final int DEFAULT_BUFFER_TIMEOUT = 1000; Review Comment: How about remove the constants and use `ASYNC_STATE_BUFFER_SIZE.defaultValue()` from the `ExecutionConfig`? This can help avoid maintaining the same value in two places. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1575573086 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; Review Comment: I added a `close()` method, 2 things include: 1. drain all in-flight records 2. shutdown `scheduledExecutor` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1574559188 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); } public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeOut, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeOut = bufferTimeOut; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.timeoutFlag = new AtomicBoolean(false); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeOut > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, +new ThreadFactory() { +@Override +public Thread newThread(Runnable r) { +return new Thread(r, "AEC-scheduler"); +} +}); +this.scheduledExecutor.setRemoveOnCancelPolicy(true); + +// make sure shutdown removes all pending tasks + this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); +} else { +this.scheduledExecutor = null; +} + LOG.info( "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", batchSize, maxInFlightRecords); } +void scheduleTimeout() { +if (bufferTimeOut > 0) { +if (currentScheduledFuture != null +&& !currentScheduledFuture.isDone() +&& !currentScheduledFuture.isCancelled()) { +currentScheduledFuture.cancel(false); +} +currentScheduledFuture = +(ScheduledFuture) +scheduledExecutor.schedule( +() -> { +timeoutFlag.set(true); +mailboxExecutor.execute( +() -> triggerIfNeeded(false), "AEC-timeout"); Review Comment: I'd suggest a `sequence number` for each buffer to make sure the timeout trigger the intended one. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
masteryhx commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1574501889 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; Review Comment: Should AEC become `Closeable` since we maintain some resources like this internally ? ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); } public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, +long bufferTimeOut, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeOut = bufferTimeOut; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.timeoutFlag = new AtomicBoolean(false); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeOut > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, +new ThreadFactory() { +@Override +public Thread newThread(Runnable r) { +return new Thread(r, "AEC-scheduler"); +} +}); +this.scheduledExecutor.setRemoveOnCancelPolicy(true); + +// make sure shutdown removes all pending tasks + this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); +} else { +this.scheduledExecutor = null; +} + LOG.info( "Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", Review Comment: nit: Also add `bufferTimeOut` here BTW, `bufferTimeOut` -> `bufferTimeout` ? ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +108,91 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; +/** + * The flag to indicate whether the {@link #bufferTimeOut} is reached, if yes, a trigger will + * perform actively when the next state request arrives even if the activeQueue has not reached + * the {@link #batchSize}. + */ +final AtomicBoolean timeoutFlag; + +/** The executor service that schedules and calls the triggers of this task. */ +final ScheduledThreadPoolExecutor scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { Review Comment: Seems it's not used and default value in this class could be removed. -- This is an automated message
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1574293924 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -50,15 +50,24 @@ public class AsyncExecutionController { private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); -public static final int DEFAULT_BATCH_SIZE = 1000; -public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; +private static final int DEFAULT_BATCH_SIZE = 1000; + +private static final int DEFAULT_BUFFER_TIMEOUT = 1000; +private static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; /** * The batch size. When the number of state requests in the active buffer exceeds the batch * size, a batched state execution would be triggered. */ private final int batchSize; +/** + * The timeout of {@link StateRequestBuffer#activeQueue} triggering in milliseconds. If the + * activeQueue has not reached the {@link #batchSize} within 'buffer-timeout' milliseconds, a + * trigger will perform actively. + */ +private final int bufferTimeOut; Review Comment: I added an implementation in second commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1572344539 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,62 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +// - Async State Execution -- + +/** + * The max limit of in-flight records number in async state execution, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +@Documentation.ExcludeFromDocumentation( +"This is an experimental option, internal use only for now.") +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-state.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async state execution, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async state execution. Async state execution provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_STATE_BUFFER_TIMEOUT} to + * control the frequency of triggering. + */ +@Experimental +@Documentation.ExcludeFromDocumentation( +"This is an experimental option, internal use only for now.") +public static final ConfigOption ASYNC_STATE_BUFFER_SIZE = +ConfigOptions.key("execution.async-state.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async state execution. Async state execution provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," ++ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency," ++ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering."); + +/** + * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link + * #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform + * actively. + */ +@Experimental +@Documentation.ExcludeFromDocumentation( +"This is an experimental option, internal use only for now.") +public static final ConfigOption ASYNC_STATE_BUFFER_TIMEOUT = Review Comment: It might be better to make time configurations as `long` instead of as `int`. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -50,15 +50,24 @@ public class AsyncExecutionController { private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); -public static final int DEFAULT_BATCH_SIZE = 1000; -public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; +private static final int DEFAULT_BATCH_SIZE = 1000; + +private static final int DEFAULT_BUFFER_TIMEOUT = 1000; +private static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; /** * The batch size. When the number of state requests in the active buffer exceeds the batch * size, a batched state execution would be triggered. */ private final int batchSize; +/** + * The timeout of {@link StateRequestBuffer#activeQueue} triggering in milliseconds. If the + * activeQueue has not reached the {@link #batchSize} within 'buffer-timeout' milliseconds, a + * trigger will perform actively. + */ +private final int bufferTimeOut;
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on PR #24667: URL: https://github.com/apache/flink/pull/24667#issuecomment-2066106851 After offline discussion, FLIP-425 is only used for stateful operators, hence ,`async-mode` was changed back to `async-state`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1571771428 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" ++ " Note: This is an experimental feature under evaluation."); + +/** + * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-mode.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async execution mode, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async execution mode. Async execution mode provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control + * the frequency of triggering. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_SIZE = +ConfigOptions.key("execution.async-mode.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," ++ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency," ++ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering."); + +/** + * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link + * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_TIMEOUT = +ConfigOptions.key("execution.async-state.buffer-timeout") Review Comment: Sry guys, I'm afraid I prefer the original proposal of `async-state` in FLIP, since this is only for the stateful operator and even the problem of `record-order` is caused by state
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569864750 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" ++ " Note: This is an experimental feature under evaluation."); + +/** + * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-mode.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async execution mode, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async execution mode. Async execution mode provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control + * the frequency of triggering. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_SIZE = +ConfigOptions.key("execution.async-mode.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," ++ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency," ++ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering."); + +/** + * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link + * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_TIMEOUT = +ConfigOptions.key("execution.async-state.buffer-timeout") Review Comment: Good point, changed it to `execution.async-mode.state-buffer-timeout`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569863724 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) { configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression); } +// +// Asynchronous execution configurations +// + +@Internal Review Comment: Ah, I misread the comment, and changed them from `@Experimental` to `@Internal`, I have corrected them now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,19 +103,26 @@ public class AsyncExecutionController { final AtomicInteger inFlightRecordNum; public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); Review Comment: Given that #24657 has been merged, it might be better to verify that the introduced configurations can pass the configured values into AEC through operator setups now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,19 +103,26 @@ public class AsyncExecutionController { final AtomicInteger inFlightRecordNum; public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this( +mailboxExecutor, +stateExecutor, +DEFAULT_BATCH_SIZE, +DEFAULT_BUFFER_TIMEOUT, +DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); Review Comment: Given that #24657 has been merged, it might be better to verify that the introduced configurations can pass the configured values into AEC through operator setups now. ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" ++ " Note: This is an experimental feature under evaluation."); + +/** + * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-mode.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async execution mode, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async execution mode. Async execution mode provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control + * the frequency of triggering. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_SIZE = +ConfigOptions.key("execution.async-mode.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," +
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on PR #24667: URL: https://github.com/apache/flink/pull/24667#issuecomment-2060999083 @yunfengzhou-hub @Zakelly Thanks for the detailed review, I have rebased this PR and addressed some comments, PTAL if you are free. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1568659638 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" ++ " Note: This is an experimental feature under evaluation."); + +/** + * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-mode.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async execution mode, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async execution mode. Async execution mode provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control + * the frequency of triggering. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_SIZE = +ConfigOptions.key("execution.async-mode.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," ++ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency," ++ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering."); + +/** + * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link + * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_TIMEOUT = +ConfigOptions.key("execution.async-state.buffer-timeout") Review Comment: Unify those options to `execution.async-mode`, cause we also provide `Record-order` mode to preserve the order of records that without state access. -- This is an automated
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1568656329 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" Review Comment: Thanks for your insight, after discussing offline, we think it would be better to expose it on the data stream API in some way so that users can enable async execution in a more fine-grained manner. So, this option is removed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1566744878 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) { configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression); } +// +// Asynchronous execution configurations +// + +@Internal Review Comment: It might be better to mark them as `@Experimental` instead of `@Internal`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1566742123 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" Review Comment: It might be better to remove this configuration for now and add it in future when such use cases are found. - Whether to enable async state access can be inferred automatically by the Flink infrastructure, depending on the location of state backends and the state API used in operators. - It is better not to expose implementation details, like the sync/async modes mentioned here, to end users. So long as the order to same-key records and the order of async state callbacks are guaranteed, that is enough for users. - Especially, the implementation in the sync mode might be altered in future, in order to improve performance in situations where sync state API + remote state backend is used. In this case the statement "the state access is always executed synchronously" might cause deprecation issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
yunfengzhou-hub commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1566744878 ## flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java: ## @@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) { configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, useSnapshotCompression); } +// +// Asynchronous execution configurations +// + +@Internal Review Comment: It might be better to mark them as `@Experimental` instead of `@Internal`. - Internal: Annotation to mark methods within **stable**, public APIs as an internal developer API. - Experimental: Classes with this annotation are neither battle-tested **nor stable**, and may be changed or removed in future versions. ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" Review Comment: It might be better to remove this configuration for now and add it in future when such use cases are found. - Whether to enable async state access can be inferred automatically by the Flink infrastructure, depending on the location of state backends and the state API used in operators. - It is better not to expose implementation details, like the sync/async modes mentioned here, to end users. So long as the order to same-key records and the order of async state callbacks are guaranteed, that is enough for users. - Especially, the implementation in the sync mode might be altered in future, in order to improve performance in situations where sync state API + remote state backend is used. In this case the statement "the state access is always executed synchronously" should cause deprecation issues. ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia commented on PR #24667: URL: https://github.com/apache/flink/pull/24667#issuecomment-2058174500 @Zakelly @ljz2051 would you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
flinkbot commented on PR #24667: URL: https://github.com/apache/flink/pull/24667#issuecomment-2056402383 ## CI report: * aea56668e33e7062e6b18dac4086a2f05dc36fc1 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
fredia opened a new pull request, #24667: URL: https://github.com/apache/flink/pull/24667 ## What is the purpose of the change As part of the async execution model of disaggregated state management, this PR introduce async execution configurations. ## Brief change log - Add async execution configurations in `ExecutionOptions` - Add related getter/setter in `ExecutionConfig` ## Verifying this change This change added tests and can be verified as follows: - StreamExecutionEnvironmentTest#testAsyncExecutionConfiguration ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs/ JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org