Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-25 Thread via GitHub


masteryhx closed pull request #24681: [FLINK-35162][state] Implement 
WriteBatchOperation and general MultiGetOperation for ForSt
URL: https://github.com/apache/flink/pull/24681


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-24 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1578764009


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {

Review Comment:
   I have refined it !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-24 Thread via GitHub


Zakelly commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1577687629


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {

Review Comment:
   It seems this `process()` still act as synchronous method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-23 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1576167215


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.util.function.FunctionWithException;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * The composite key which contains some context information, such as 
keyGroup, etc.
+ *
+ * @param  The type of the raw key.
+ */
+@ThreadSafe
+public class ContextKey {
+
+private final RecordContext recordContext;
+
+public ContextKey(RecordContext recordContext) {
+this.recordContext = recordContext;
+}
+
+public K getRawKey() {
+return recordContext.getKey();
+}
+
+public int getKeyGroup() {
+return recordContext.getKeyGroup();
+}
+
+/**
+ * Get the serialized key. If the cached serialized key within {@code 
RecordContext#payload} is
+ * null, the provided serialization function will be called, and the 
serialization result will
+ * be cached by {@code RecordContext#payload}.
+ *
+ * @param serializeKeyFunc the provided serialization function for this 
contextKey.
+ * @return the serialized bytes.
+ */
+public byte[] getOrCreateSerializedKey(
+FunctionWithException, byte[], IOException> 
serializeKeyFunc)
+throws IOException {
+if (recordContext.getPayload() != null) {
+return (byte[]) recordContext.getPayload();
+}
+synchronized (recordContext) {
+if (recordContext.getPayload() == null) {
+byte[] serializedKey = serializeKeyFunc.apply(this);
+recordContext.setPayload(serializedKey);

Review Comment:
   No, only the serialized key will be cached in payload.  The `value` cannot 
be reused between different state execution stage, so there is no need to cache 
`value`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-23 Thread via GitHub


fredia commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1575855165


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.util.function.FunctionWithException;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * The composite key which contains some context information, such as 
keyGroup, etc.
+ *
+ * @param  The type of the raw key.
+ */
+@ThreadSafe
+public class ContextKey {
+
+private final RecordContext recordContext;
+
+public ContextKey(RecordContext recordContext) {
+this.recordContext = recordContext;
+}
+
+public K getRawKey() {
+return recordContext.getKey();
+}
+
+public int getKeyGroup() {
+return recordContext.getKeyGroup();
+}
+
+/**
+ * Get the serialized key. If the cached serialized key within {@code 
RecordContext#payload} is
+ * null, the provided serialization function will be called, and the 
serialization result will
+ * be cached by {@code RecordContext#payload}.
+ *
+ * @param serializeKeyFunc the provided serialization function for this 
contextKey.
+ * @return the serialized bytes.
+ */
+public byte[] getOrCreateSerializedKey(
+FunctionWithException, byte[], IOException> 
serializeKeyFunc)
+throws IOException {
+if (recordContext.getPayload() != null) {
+return (byte[]) recordContext.getPayload();
+}
+synchronized (recordContext) {
+if (recordContext.getPayload() == null) {
+byte[] serializedKey = serializeKeyFunc.apply(this);
+recordContext.setPayload(serializedKey);

Review Comment:
   Will the `serializedKey` and `value` are both store in `payload`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-23 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1575745949


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import javax.annotation.Nullable;
+
+/**
+ * The composite key which contains some context information.
+ *
+ * @param  The type of the raw key.
+ */
+public class ContextKey {

Review Comment:
   @Zakelly  As we discussed offline, I have attached the keyGroup and payload 
(serialized key) into `RecordContext `, and the `ContextKey` is placed in 
forst-module. Please help to review again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1575586119


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {
+CompletableFuture result = new CompletableFuture<>();
+try (WriteBatch writeBatch =
+new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+for (Request request : batchRequest) {
+ForStInnerTable table = request.table;
+if (request.value == null) {
+// put(key, null) == delete(key)
+writeBatch.delete(
+table.getColumnFamilyHandle(), 
table.serializeKey(request.key));
+} else {
+writeBatch.put(
+table.getColumnFamilyHandle(),
+table.serializeKey(request.key),
+table.serializeValue(request.value));
+}
+}
+db.write(writeOptions, writeBatch);
+result.complete(null);
+} catch (RocksDBException e) {
+result.completeExceptionally(e);
+throw new IOException("Error while adding data to ForStDB", e);
+}
+return result;
+}
+
+/** The Put access request for ForStDB. */
+static class Request {
+final K key;
+final V value;
+final ForStInnerTable table;
+
+Request(K key, V value, ForStInnerTable table) {
+this.key = key;
+this.value = value;
+this.table = table;
+}
+
+static  Request of(K key, V value, ForStInnerTable 
table) {

Review Comment:
   I have refined it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1575585902


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {
+CompletableFuture result = new CompletableFuture<>();
+try (WriteBatch writeBatch =
+new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+for (Request request : batchRequest) {
+ForStInnerTable table = request.table;
+if (request.value == null) {
+// put(key, null) == delete(key)
+writeBatch.delete(
+table.getColumnFamilyHandle(), 
table.serializeKey(request.key));
+} else {
+writeBatch.put(
+table.getColumnFamilyHandle(),
+table.serializeKey(request.key),
+table.serializeValue(request.value));
+}
+}
+db.write(writeOptions, writeBatch);
+result.complete(null);
+} catch (RocksDBException e) {
+result.completeExceptionally(e);
+throw new IOException("Error while adding data to ForStDB", e);
+}
+return result;
+}
+
+/** The Put access request for ForStDB. */
+static class Request {

Review Comment:
   They don't have structural  relationship. It's a good idea that using 
`GetRequest` and `PutRequest` to distinguish them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1575583882


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.ContextKey;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class ForStValueState extends InternalValueState
+implements ValueState, ForStInnerTable, V> {
+
+/** The column family which this internal value state belongs to. */
+private final ColumnFamilyHandle columnFamilyHandle;
+
+/** The serialized key builder which should be thread-safe. */
+private final ThreadLocal> 
serializedKeyBuilder;
+
+/** The data outputStream used for value serializer, which should be 
thread-safe. */
+private final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which should be 
thread-safe. */
+private final ThreadLocal valueDeserializerView;
+
+public ForStValueState(
+StateRequestHandler stateRequestHandler,
+ColumnFamilyHandle columnFamily,
+ValueStateDescriptor valueStateDescriptor,
+ThreadLocal> serializedKeyBuilder,

Review Comment:
   I think we could just pass the `Supplier` to `ForStValueState`, and keep the 
`ThreadLocal` semantics  inside ForStState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


masteryhx commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1574412285


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.ContextKey;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class ForStValueState extends InternalValueState
+implements ValueState, ForStInnerTable, V> {
+
+/** The column family which this internal value state belongs to. */
+private final ColumnFamilyHandle columnFamilyHandle;
+
+/** The serialized key builder which should be thread-safe. */
+private final ThreadLocal> 
serializedKeyBuilder;
+
+/** The data outputStream used for value serializer, which should be 
thread-safe. */
+private final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which should be 
thread-safe. */
+private final ThreadLocal valueDeserializerView;
+
+public ForStValueState(
+StateRequestHandler stateRequestHandler,
+ColumnFamilyHandle columnFamily,
+ValueStateDescriptor valueStateDescriptor,
+ThreadLocal> serializedKeyBuilder,

Review Comment:
   `Threadlocal` parameters are a bit confusing for callers.
   And also make life cycles of these variables expose to callers.
   Could we avoid it and just maintain them internally ?



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {
+CompletableFuture result = new 

Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


Zakelly commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1574275896


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import javax.annotation.Nullable;
+
+/**
+ * The composite key which contains some context information.
+ *
+ * @param  The type of the raw key.
+ */
+public class ContextKey {

Review Comment:
   Well, I didn't mean that it should move to the `flink-runtime`. I was 
thinking providing a payload in `RecordContext` for state backend to assign 
customized data defined by itself. WDYT?
   
   And by the way, the key and keygroup should be provided in `RecordContext` 
so maybe we could save some memory overhead here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


jectpro7 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1574275399


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
+ * calling the Get API multiple times with multiple threads.
+ *
+ * @param  The type of key in get access request.
+ * @param  The type of value in get access request.
+ */
+public class ForStGeneralMultiGetOperation implements 
ForStDBOperation> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final Executor executor;
+
+ForStGeneralMultiGetOperation(RocksDB db, List> 
batchRequest, Executor executor) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.executor = executor;
+}
+
+@Override
+public CompletableFuture> process() throws IOException {
+
+CompletableFuture> future = new CompletableFuture<>();
+@SuppressWarnings("unchecked")
+V[] result = (V[]) new Object[batchRequest.size()];
+Arrays.fill(result, null);
+
+AtomicInteger counter = new AtomicInteger(batchRequest.size());
+for (int i = 0; i < batchRequest.size(); i++) {
+Request request = batchRequest.get(i);
+final int index = i;
+executor.execute(
+() -> {
+try {
+ForStInnerTable table = request.table;
+byte[] key = table.serializeKey(request.key);
+byte[] value = 
db.get(table.getColumnFamilyHandle(), key);

Review Comment:
   oh, got it, good to learn. Thanks for sharing the background.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


jectpro7 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1574272182


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {
+CompletableFuture result = new CompletableFuture<>();
+try (WriteBatch writeBatch =
+new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+for (Request request : batchRequest) {
+ForStInnerTable table = request.table;
+if (request.value == null) {
+// put(key, null) == delete(key)
+writeBatch.delete(
+table.getColumnFamilyHandle(), 
table.serializeKey(request.key));
+} else {
+writeBatch.put(
+table.getColumnFamilyHandle(),
+table.serializeKey(request.key),
+table.serializeValue(request.value));
+}
+}
+db.write(writeOptions, writeBatch);
+result.complete(null);
+} catch (RocksDBException e) {

Review Comment:
   sounds great



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-21 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1574080412


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {
+CompletableFuture result = new CompletableFuture<>();
+try (WriteBatch writeBatch =
+new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+for (Request request : batchRequest) {
+ForStInnerTable table = request.table;
+if (request.value == null) {
+// put(key, null) == delete(key)
+writeBatch.delete(
+table.getColumnFamilyHandle(), 
table.serializeKey(request.key));
+} else {
+writeBatch.put(
+table.getColumnFamilyHandle(),
+table.serializeKey(request.key),
+table.serializeValue(request.value));
+}
+}
+db.write(writeOptions, writeBatch);
+result.complete(null);
+} catch (RocksDBException e) {

Review Comment:
   I think it would be better to catch all types of exceptions here, not just 
`RocksDBException `, and complete the result future exceptionally.  WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-21 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1574076315


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
+ * calling the Get API multiple times with multiple threads.
+ *
+ * @param  The type of key in get access request.
+ * @param  The type of value in get access request.
+ */
+public class ForStGeneralMultiGetOperation implements 
ForStDBOperation> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final Executor executor;
+
+ForStGeneralMultiGetOperation(RocksDB db, List> 
batchRequest, Executor executor) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.executor = executor;
+}
+
+@Override
+public CompletableFuture> process() throws IOException {
+
+CompletableFuture> future = new CompletableFuture<>();
+@SuppressWarnings("unchecked")
+V[] result = (V[]) new Object[batchRequest.size()];
+Arrays.fill(result, null);
+
+AtomicInteger counter = new AtomicInteger(batchRequest.size());
+for (int i = 0; i < batchRequest.size(); i++) {
+Request request = batchRequest.get(i);
+final int index = i;
+executor.execute(
+() -> {
+try {
+ForStInnerTable table = request.table;
+byte[] key = table.serializeKey(request.key);
+byte[] value = 
db.get(table.getColumnFamilyHandle(), key);

Review Comment:
   @jectpro7  Thanks four your advice. 
   The `MultiGet` api based on remote filesystem is not currently supported by 
ForStDB, and more importantly, not all file systems will support the `MultiGet` 
api, so the purpose of this PR  is to introduce a general `MultiGet` 
implementation that **works on all file systems**.   
   
   In addition, I will introduce the ForStDB native `MultiGet` api in another 
jira ([FLINK-35163](https://issues.apache.org/jira/browse/FLINK-35163)) to  
optimize remote state access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-21 Thread via GitHub


jectpro7 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1573855255


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
+ * calling the Get API multiple times with multiple threads.
+ *
+ * @param  The type of key in get access request.
+ * @param  The type of value in get access request.
+ */
+public class ForStGeneralMultiGetOperation implements 
ForStDBOperation> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final Executor executor;
+
+ForStGeneralMultiGetOperation(RocksDB db, List> 
batchRequest, Executor executor) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.executor = executor;
+}
+
+@Override
+public CompletableFuture> process() throws IOException {
+
+CompletableFuture> future = new CompletableFuture<>();
+@SuppressWarnings("unchecked")
+V[] result = (V[]) new Object[batchRequest.size()];
+Arrays.fill(result, null);
+
+AtomicInteger counter = new AtomicInteger(batchRequest.size());
+for (int i = 0; i < batchRequest.size(); i++) {
+Request request = batchRequest.get(i);
+final int index = i;
+executor.execute(
+() -> {
+try {
+ForStInnerTable table = request.table;
+byte[] key = table.serializeKey(request.key);
+byte[] value = 
db.get(table.getColumnFamilyHandle(), key);

Review Comment:
   Hi @ljz2051, it creates many rpc request here, as FLIP-426 mentioned the rpc 
round-trip overhead is the bottleneck. It might be better by using 
`multiGetAsList`



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> 

Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-19 Thread via GitHub


ljz2051 commented on PR #24681:
URL: https://github.com/apache/flink/pull/24681#issuecomment-2066281635

   @Zakelly Thanks for your review! I have addressed all comments your 
mentioned. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-19 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1572160266


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBOperation.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+/**
+ * Data access operation to ForStDB. This interface is used to encapsulate the 
DB access operations
+ * formed after grouping state access. For more information about “Grouping 
state access”, please
+ * refer to FLIP-426.
+ *
+ * @param  The type of output for DB access.
+ */
+@Internal
+public interface ForStDBOperation {
+
+/**
+ * Process the ForStDB access requests.
+ *
+ * @return Processing result.
+ * @throws IOException Thrown if ForStDB access encountered an I/O related 
error.
+ */
+OUT process() throws IOException;

Review Comment:
   Good point. I agree that the future-style interface is more flexible. I have 
refined it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-19 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1572156946


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import javax.annotation.Nullable;
+
+/**
+ * The composite key which contains some context information.
+ *
+ * @param  The type of the raw key.
+ */
+public class ContextKey {

Review Comment:
   Yes,  the ContextKey can be shared across state requests, and ContextKey 
will be attached to the RecordContext in another PR. **So I move the ContextKey 
class to flink-runtime module from forst-module**.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-19 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1572153463


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import javax.annotation.Nullable;
+
+/**
+ * The composite key which contains some context information.
+ *
+ * @param  The type of the raw key.
+ */
+public class ContextKey {
+
+private final K rawKey;
+
+private final int keyGroup;
+
+/**
+ * A record in user layer may access the state multiple times. The {@code 
serializedKey} can be
+ * used to cache the serialized key bytes after its first serialization, 
so that subsequent
+ * state accesses with the same key can avoid being serialized repeatedly.
+ */
+private @Nullable byte[] serializedKey = null;
+
+public ContextKey(K rawKey, int keyGroup) {

Review Comment:
   I have remove the "ContextKey#of" static method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-19 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1572151847


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class ForStValueState extends InternalValueState
+implements ValueState, ForStInnerTable, V> {
+
+/** The column family which this internal value state belongs to. */
+private final ColumnFamilyHandle columnFamilyHandle;
+
+/** The serialized key builder which need be thread-safe. */
+private final ThreadLocal> 
serializedKeyBuilder;
+
+/** The data outputStream used for value serializer, which need be 
thread-safe. */
+private final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which need be 
thread-safe. */
+private final ThreadLocal valueDeserializerView;
+
+public ForStValueState(
+StateRequestHandler stateRequestHandler,
+ColumnFamilyHandle columnFamily,
+ValueStateDescriptor valueStateDescriptor,
+ThreadLocal> serializedKeyBuilder,
+ThreadLocal valueSerializerView,
+ThreadLocal valueDeserializerView) {
+super(stateRequestHandler, valueStateDescriptor);
+this.columnFamilyHandle = columnFamily;
+this.serializedKeyBuilder = serializedKeyBuilder;
+this.valueSerializerView = valueSerializerView;
+this.valueDeserializerView = valueDeserializerView;
+}
+
+@Override
+public ColumnFamilyHandle getColumnFamilyHandle() {
+return columnFamilyHandle;
+}
+
+@Override
+public byte[] serializeKey(ContextKey contextKey) throws IOException {

Review Comment:
   Yes. I have marked the ContextKey with  @ThreadSafe annotation, and refactor 
the 'read cache or serialize' logic. Please review the 
ContextKey#getOrCreateSerializedKey method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-18 Thread via GitHub


Zakelly commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1571721125


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class ForStValueState extends InternalValueState
+implements ValueState, ForStInnerTable, V> {
+
+/** The column family which this internal value state belongs to. */
+private final ColumnFamilyHandle columnFamilyHandle;
+
+/** The serialized key builder which need be thread-safe. */
+private final ThreadLocal> 
serializedKeyBuilder;
+
+/** The data outputStream used for value serializer, which need be 
thread-safe. */
+private final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which need be 
thread-safe. */
+private final ThreadLocal valueDeserializerView;
+
+public ForStValueState(
+StateRequestHandler stateRequestHandler,
+ColumnFamilyHandle columnFamily,
+ValueStateDescriptor valueStateDescriptor,
+ThreadLocal> serializedKeyBuilder,
+ThreadLocal valueSerializerView,
+ThreadLocal valueDeserializerView) {
+super(stateRequestHandler, valueStateDescriptor);
+this.columnFamilyHandle = columnFamily;
+this.serializedKeyBuilder = serializedKeyBuilder;
+this.valueSerializerView = valueSerializerView;
+this.valueDeserializerView = valueDeserializerView;
+}
+
+@Override
+public ColumnFamilyHandle getColumnFamilyHandle() {
+return columnFamilyHandle;
+}
+
+@Override
+public byte[] serializeKey(ContextKey contextKey) throws IOException {

Review Comment:
   Should `ContextKey` be thread-safe? It seems the 'read cache or serialize' 
logic better be called only once for each context key.



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMultiGetOperation.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The native multiGet operation implementation for ForStDB.
+ *
+ * @param  The type of key in get access request.
+ * @param  The type of value in get access request.
+ */
+public class ForStNativeMultiGetOperation implements 
ForStDBOperation> {

Review Comment:
   We can introduce this later   and may be excluded from this PR?



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java:
##
@@ -0,0 

Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-18 Thread via GitHub


flinkbot commented on PR #24681:
URL: https://github.com/apache/flink/pull/24681#issuecomment-2063985516

   
   ## CI report:
   
   * b1abbbc14798703d82a9115d42141aa0bb62abbc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-18 Thread via GitHub


ljz2051 commented on PR #24681:
URL: https://github.com/apache/flink/pull/24681#issuecomment-2063978444

   @Zakelly @fredia @masteryhx Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-18 Thread via GitHub


ljz2051 opened a new pull request, #24681:
URL: https://github.com/apache/flink/pull/24681

   ## What is the purpose of the change
   
   This pull request introduces the ForStValueState and support 
WriteBatchOperation and general MultiGetOperation for ForSt.
   
   
   ## Brief change log
   
 - Introduces the ForStValueState 
 - Support WriteBatchOperation and general MultiGetOperation for ForSt
   
   ## Verifying this change
   
   This change added tests and can be verified  by  
ForStWriteBatchOperationTest and ForStGeneralMultiGetOperationTest.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:  no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org