Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-05-13 Thread via GitHub


Zakelly merged PR #24690:
URL: https://github.com/apache/flink/pull/24690


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-05-13 Thread via GitHub


Zakelly commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2107381572

   > @Zakelly Thanks for the PR, I have a question about the overall design. It 
seems that an iterator can actually be split into several executions. If other 
UPDATE are inserted in the middle, will it affect the final result of the 
iterator?
   
   Ideally any UPDATE happens in middle of iterating won't affect the result of 
iteration. This should be ensured by the `StateBackend`, which is beyond the 
story of this PR. The necessary information for `StateBackend`s to maintain the 
iterator is relayed back via the `nextPayloadForContinuousLoading()`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-05-12 Thread via GitHub


masteryhx commented on code in PR #24690:
URL: https://github.com/apache/flink/pull/24690#discussion_r1597852929


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.core.state.StateFutureUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A {@link StateIterator} implementation to facilitate async data load of 
iterator. Each state
+ * backend could override this class to maintain more variables in need. Any 
subclass should
+ * implement two methods, {@link #hasNext()} and {@link 
#nextPayloadForContinuousLoading()}. The
+ * philosophy behind this class is to carry some already loaded elements and 
provide iterating right
+ * on the task thread, and load following ones if needed (determined by {@link 
#hasNext()}) by
+ * creating **ANOTHER** iterating request. Thus, later it returns another 
iterator instance, and we
+ * continue to apply the user iteration on that instance. The whole elements 
will be iterated by
+ * recursive call of {@code #onNext()}.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class AbstractStateIterator implements StateIterator {
+
+/** The state this iterator iterates on. */
+final State originalState;
+
+/** The request type that create this iterator. */
+final StateRequestType requestType;
+
+/** The controller that can receive further requests. */
+final AsyncExecutionController aec;

Review Comment:
   Could rebase and be `StateRequestHandler`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-23 Thread via GitHub


jectpro7 commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2073002740

   I see, the major part of it has done. And in the long term, I believe it is 
better than using reactor, as it is more controllable, and easier to extend 
regarding to Flink business needs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-23 Thread via GitHub


Zakelly commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2072141564

   > @Zakelly yes it is Project Reactor, I totally understand your concerns, we 
don't need too many heavy and fancy staff, but what we need just only the 
[reactor-core](https://github.com/reactor/reactor-core). It can help managing 
threads, context and chaining the callback operations etc. And reactor is 
proven to be very high efficiency, if we could build based on reactor, it will 
save tons of effort.
   
   Actually, the real effort is `StateFutureImpl` and `AbstractStateIterator`, 
which have been implemented. Other parts about controlling the thread, 
scheduling and checkpointing should be implemented by ourselves to ensure the 
correct behavior with other existing components. The key point is we're not 
building from nothing, and we should leverage the existing thread model of 
Flink.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-23 Thread via GitHub


jectpro7 commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2071773582

   @Zakelly yes it is Project Reactor, I totally understand your concerns, we 
don't need too many heavy and fancy staff, but what we need just only the 
[reactor-core](https://github.com/reactor/reactor-core). It can help managing 
threads, context and chaining the callback operations etc. And reactor is 
proven to be very high efficiency, if we could build based on reactor, it will 
save tons of effort. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-22 Thread via GitHub


Zakelly commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2071298219

   > Hi @Zakelly, the code si LGTM, I have one question, why didn't leverage 
the capability of Reactor for these kind of async callback? It seems very 
similar to me, anything I missed? just curious.
   
   Do you mean the Project Reactor?
   The main reason is that we want to carefully customize and control the 
processing of async requests, to make sure it correctly interacts with Flink's 
components (back-pressure, mailbox, watermark and checkpoint). So what we 
really need is the basic functionality of futures, and we built up the whole 
async processing on top of this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-22 Thread via GitHub


jectpro7 commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2070594485

   Hi @Zakelly, the code si LGTM, I have one question, why didn't leverage the 
capability of Reactor for these kind of async callback? It seems very similar 
to me, anything I missed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-21 Thread via GitHub


Zakelly commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2068416578

   Rebased master to resolve conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-19 Thread via GitHub


flinkbot commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2066354887

   
   ## CI report:
   
   * 5fa3b48c1caed9d9e4ff9b31c1b8640318c18555 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-19 Thread via GitHub


Zakelly opened a new pull request, #24690:
URL: https://github.com/apache/flink/pull/24690

   ## What is the purpose of the change
   
   This PR ship the basic logic of state iterator, only leave the necessary 
part abstract for subclass to implement. Most of the code is future-related.
   
   ## Brief change log
   
- Introduce `AbstractStateIterator` and corresponding tests.
   
   
   ## Verifying this change
   
- Run newly added `AbstractStateIteratorTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org