Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-17 Thread via GitHub


masteryhx closed pull request #24651: [FLINK-34987][state] Introduce Internal 
State for Async State API
URL: https://github.com/apache/flink/pull/24651


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-17 Thread via GitHub


masteryhx commented on PR #24651:
URL: https://github.com/apache/flink/pull/24651#issuecomment-2060873955

   Rebased again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-17 Thread via GitHub


masteryhx commented on PR #24651:
URL: https://github.com/apache/flink/pull/24651#issuecomment-2060813581

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-16 Thread via GitHub


masteryhx commented on PR #24651:
URL: https://github.com/apache/flink/pull/24651#issuecomment-2060251285

   Rebased master.
   I will merge it after the CI is green. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565354167


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKeyedState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * The public API state hierarchy is intended to be programmed against by 
Flink applications. The
+ * internal state hierarchy holds all the auxiliary methods that communicates 
with {@link
+ * AsyncExecutionController} and not intended to be used by user applications.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+@Internal
+public abstract class InternalKeyedState implements State {

Review Comment:
   Make sense, let's consider it later, thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565352484


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;

Review Comment:
   Thanks for the clarification 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565315084


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKeyedState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * The public API state hierarchy is intended to be programmed against by 
Flink applications. The
+ * internal state hierarchy holds all the auxiliary methods that communicates 
with {@link
+ * AsyncExecutionController} and not intended to be used by user applications.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+@Internal
+public abstract class InternalKeyedState implements State {

Review Comment:
   Since we may have different machinisms about Window Operator and state with 
namespace, I'd prefer to consider it together with them later. WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565310323


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;

Review Comment:
   It's user-defined / SQL-defined and unique just like before.
   Even if the sync state and async state co-exist, they should have different 
`stateId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-15 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565293565


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;
+
+/** The serializer for the type. */
+@Nonnull private final TypeSerializer typeSerializer;
+
+/**
+ * The type information describing the value type. Remain this since it 
could provide more
+ * information which could be used internally in the future.
+ */
+@Nonnull private final TypeInformation typeInfo;

Review Comment:
   Seems it's a bit confusing here so I just removed it currently.
   Let's add it if needed in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-14 Thread via GitHub


fredia commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565164680


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;

Review Comment:
   How is `stateId` generated? If the sync state and async state co-exist, can 
they use the same `stateId`?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKeyedState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * The public API state hierarchy is intended to be programmed against by 
Flink applications. The
+ * internal state hierarchy holds all the auxiliary methods that communicates 
with {@link
+ * AsyncExecutionController} and not intended to be used by user applications.
+ *
+ * @param  The type of key the state is associated to.
+ * @param  The type of values kept internally in state.
+ */
+@Internal
+public abstract class InternalKeyedState implements State {

Review Comment:
   Is the `namespace` required here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-14 Thread via GitHub


Zakelly commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565167042


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;
+
+/** The serializer for the type. */
+@Nonnull private final TypeSerializer typeSerializer;
+
+/**
+ * The type information describing the value type. Remain this since it 
could provide more
+ * information which could be used internally in the future.
+ */
+@Nonnull private final TypeInformation typeInfo;

Review Comment:
   I thought the statement `which could be used internally in the future` means 
it will be used to create the serializer lazily, which no longer applies to 
current implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-14 Thread via GitHub


masteryhx commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565160755


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;
+
+/** The serializer for the type. */
+@Nonnull private final TypeSerializer typeSerializer;
+
+/**
+ * The type information describing the value type. Remain this since it 
could provide more
+ * information which could be used internally in the future.
+ */
+@Nonnull private final TypeInformation typeInfo;

Review Comment:
   I just remain this since it could provide/persist more information e.g. 
schema than TypeSerializer as its comment.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-14 Thread via GitHub


Zakelly commented on code in PR #24651:
URL: https://github.com/apache/flink/pull/24651#discussion_r1565148947


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalKeyedState.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+
+/**
+ * The {@code InternalKvState} is the root of the internal state type 
hierarchy, similar to the

Review Comment:
   nit. replace all `InternalKvState` with `InternalKeyedState`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.v2;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * State in stateful operations internally.
+ *
+ * @param  The type of the value of the state object described by this 
state descriptor.
+ */
+@Internal
+public abstract class StateDescriptor implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+/** An enumeration of the types of supported states. */
+public enum Type {
+VALUE,
+LIST,
+REDUCING,
+FOLDING,
+AGGREGATING,
+MAP
+}
+
+/** ID that uniquely identifies state created from this StateDescriptor. */
+@Nonnull private final String stateId;
+
+/** The serializer for the type. */
+@Nonnull private final TypeSerializer typeSerializer;
+
+/**
+ * The type information describing the value type. Remain this since it 
could provide more
+ * information which could be used internally in the future.
+ */
+@Nonnull private final TypeInformation typeInfo;

Review Comment:
   IIUC, we don't need this?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/StateDescriptor.java:
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed un