Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-25 Thread via GitHub


masteryhx closed pull request #24682: [FLINK-35047][state] Support 
ForStStateBackend and ForStKeyedStateBackend
URL: https://github.com/apache/flink/pull/24682


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-25 Thread via GitHub


masteryhx commented on PR #24682:
URL: https://github.com/apache/flink/pull/24682#issuecomment-2076795175

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-25 Thread via GitHub


masteryhx commented on PR #24682:
URL: https://github.com/apache/flink/pull/24682#issuecomment-2076794847

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-24 Thread via GitHub


masteryhx commented on code in PR #24682:
URL: https://github.com/apache/flink/pull/24682#discussion_r1578773304


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Disposable;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+
+/**
+ * A KeyedStateBackend that stores its state in {@code ForSt}. This state 
backend can store very
+ * large state that exceeds memory even disk to remote storage. TODO: Support 
to implement the new
+ * interface of KeyedStateBackend
+ */
+public class ForStKeyedStateBackend implements Disposable {

Review Comment:
   It will be resolved in 
[FLINK-35048](https://issues.apache.org/jira/browse/FLINK-35048) as TODO 
described 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-24 Thread via GitHub


masteryhx commented on code in PR #24682:
URL: https://github.com/apache/flink/pull/24682#discussion_r1578772481


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Filter;
+import org.rocksdb.FlinkEnv;
+import org.rocksdb.IndexType;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.Statistics;
+import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * The container for ForSt resources, including option factory and shared 
resource among instances.
+ *
+ * This should be the only entrance for ForStStateBackend to get ForSt 
options, and should be
+ * properly (and necessarily) closed to prevent resource leak.
+ */
+public final class ForStResourceContainer implements AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStResourceContainer.class);
+
+private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG";
+
+// the filename length limit is 255 on most operating systems
+private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
FORST_RELOCATE_LOG_SUFFIX.length();
+
+private static final String DB_DIR_STRING = "db";
+
+@Nullable private final URI remoteBasePath;
+
+@Nullable private final URI remoteForStPath;
+
+@Nullable private final File localBasePath;

Review Comment:
   `localBasePath` is the path of keyed state backend while `localForStPath` is 
the path of ForStDB.
   It's also consistent with the design of RocksDB



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-24 Thread via GitHub


masteryhx commented on code in PR #24682:
URL: https://github.com/apache/flink/pull/24682#discussion_r1578770992


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStRestoreOperation.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.restore;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.RestoreOperation;
+
+/** Interface for ForSt restore. */
+@Internal
+public interface ForStRestoreOperation extends 
RestoreOperation, AutoCloseable {

Review Comment:
   It's just introduced to init the ForStKeyedStateBackend.
   I'd prefer to maintain it as `NonRestoreOperation` which is consistent with 
other state backends.
   We could expand it when we implement real restore operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24682:
URL: https://github.com/apache/flink/pull/24682#discussion_r1577715584


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Filter;
+import org.rocksdb.FlinkEnv;
+import org.rocksdb.IndexType;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.Statistics;
+import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * The container for ForSt resources, including option factory and shared 
resource among instances.
+ *
+ * This should be the only entrance for ForStStateBackend to get ForSt 
options, and should be
+ * properly (and necessarily) closed to prevent resource leak.
+ */
+public final class ForStResourceContainer implements AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStResourceContainer.class);
+
+private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG";
+
+// the filename length limit is 255 on most operating systems
+private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
FORST_RELOCATE_LOG_SUFFIX.length();
+
+private static final String DB_DIR_STRING = "db";
+
+@Nullable private final URI remoteBasePath;
+
+@Nullable private final URI remoteForStPath;
+
+@Nullable private final File localBasePath;

Review Comment:
   What is different between `localBasePath` and `localForStPath`?



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResourcesFactory.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.runtime.memory.SharedResources;
+import org.apache.flink.util.function.LongFunctionWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.state.forst.ForStOptions.FIX_PER_TM_MEMORY_SIZE;
+
+/**
+ * A factory of {@link ForStSharedResources}. Encapsulates memory share scope 
(e.g. TM, Slot) and
+ * lifecycle (managed/unmanaged).
+ */
+enum F

Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24682:
URL: https://github.com/apache/flink/pull/24682#discussion_r1577638164


##
flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMultiClassLoaderTest.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.RocksDB;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+
+import static 
org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * This test validates that the ForSt JNI library loading works properly in 
the presence of the
+ * ForSt code being loaded dynamically via reflection. That can happen when 
ForSt is in the user
+ * code JAR, or in certain test setups.
+ */
+public class ForStMultiClassLoaderTest {
+
+@Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
+@Test
+public void testTwoSeparateClassLoaders() throws Exception {
+// collect the libraries / class folders with RocksDB related code: 
the state backend and
+// RocksDB itself

Review Comment:
   How about adding a TODO that tests loading RocksDB and ForSt at the same 
time.
   



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStRestoreOperation.java:
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.restore;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.RestoreOperation;
+
+/** Interface for ForSt restore. */
+@Internal
+public interface ForStRestoreOperation extends 
RestoreOperation, AutoCloseable {

Review Comment:
   Does the restore-related component need to be introduced in the first PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-22 Thread via GitHub


masteryhx commented on code in PR #24682:
URL: https://github.com/apache/flink/pull/24682#discussion_r1574482995


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/** Utils for ForSt Operations. */
+public class ForStOperationUtils {
+
+/**
+ * The name of the merge operator in ForSt. Do not change except you know 
exactly what you do.
+ */
+public static final String MERGE_OPERATOR_NAME = "stringappendtest";

Review Comment:
   It's introduced similar to FLINK-7220 which doesn't allow users to override 
(or forget to set it).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-22 Thread via GitHub


masteryhx commented on code in PR #24682:
URL: https://github.com/apache/flink/pull/24682#discussion_r1574455229


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Filter;
+import org.rocksdb.FlinkEnv;
+import org.rocksdb.IndexType;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.Statistics;
+import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * The container for ForSt resources, including option factory and shared 
resource among instances.
+ *
+ * This should be the only entrance for ForStStateBackend to get ForSt 
options, and should be
+ * properly (and necessarily) closed to prevent resource leak.
+ */
+public final class ForStResourceContainer implements AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStResourceContainer.class);
+
+private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG";
+
+// the filename length limit is 255 on most operating systems
+private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
FORST_RELOCATE_LOG_SUFFIX.length();
+
+private static final String DB_DIR_STRING = "db";
+
+@Nullable private final URI remoteBasePath;
+
+@Nullable private final URI remoteForStPath;
+
+@Nullable private final File localBasePath;
+
+@Nullable private final File localForStPath;
+
+/** The configurations from file. */
+private final ReadableConfig configuration;
+
+/** The options factory to create the ForSt options. */
+@Nullable private final ForStOptionsFactory optionsFactory;
+
+/**
+ * The shared resource among ForSt instances. This resource is not part of 
the 'handlesToClose',
+ * because the handles to close are closed quietly, whereas for this one, 
we want exceptions to
+ * be reported.
+ */
+@Nullable private final OpaqueMemoryResource 
sharedResources;
+
+private final boolean enableStatistics;
+
+/** The handles to be closed when the container is closed. */
+private final ArrayList handlesToClose;
+
+@Nullable private Path relocatedDbLogBaseDir;
+
+@VisibleForTesting
+public ForStResourceContainer() {
+this(new Configuration(), null, null, null, null, false);
+}
+
+@VisibleForTesting
+public ForStResourceContainer(@Nullable ForStOptionsFactory 
optionsFactory) {
+this(new Configuration(), optionsFactory, null, null, null, false);
+}
+
+@VisibleForTesting
+public ForStResourceContainer(
+@Nullable ForStOptionsFactory optionsFactory,
+@Nullable OpaqueMemoryResource 
sharedResources) {
+this(new Configuration(), optionsFactory, sharedResources, null, null, 
false);
+}
+
+public ForStResourceContainer(
+ReadableConfig configuration,
+@Nullable ForStOptionsFactory optionsFactory,
+@Nullable OpaqueMemoryResource 
sharedResources,
+@Nullable File localBasePath,
+@Nullable URI remoteBasePath,
+boolean enableStatistics) {
+
+thi

Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-22 Thread via GitHub


Zakelly commented on code in PR #24682:
URL: https://github.com/apache/flink/pull/24682#discussion_r1574434930


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/** Utils for ForSt Operations. */
+public class ForStOperationUtils {
+
+/**
+ * The name of the merge operator in ForSt. Do not change except you know 
exactly what you do.
+ */
+public static final String MERGE_OPERATOR_NAME = "stringappendtest";

Review Comment:
   Is this still needed?



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Filter;
+import org.rocksdb.FlinkEnv;
+import org.rocksdb.IndexType;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.Statistics;
+import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * The container for ForSt resources, including option factory and shared 
resource among instances.
+ *
+ * This should be the only entrance for ForStStateBackend to get ForSt 
options, and should be
+ * properly (and necessarily) closed to prevent resource leak.
+ */
+public final class ForStResourceContainer implements AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ForStResourceContainer.class);
+
+private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG";
+
+// the filename length limit is 255 on most operating systems
+private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
FORST_RELOCATE_LOG_S

Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-18 Thread via GitHub


flinkbot commented on PR #24682:
URL: https://github.com/apache/flink/pull/24682#issuecomment-2065635116

   
   ## CI report:
   
   * a5be0a188101bcaf6fb76c8d5144ad002a79a192 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]

2024-04-18 Thread via GitHub


masteryhx opened a new pull request, #24682:
URL: https://github.com/apache/flink/pull/24682

   
   
   ## What is the purpose of the change
   
   A ForStStateBackend is introduced to leverage ForSt as state store for Flink.
   
   This ticket includes:
   
   - Life cycle of ForSt, including initlization/closing
   - basic options, resource control, metrics like RocksDBStateBackend
   
   It doesn't include the implementation of new AsyncKeyedStateBackend and 
Async State API which will be resolved in other PRs.
   
   ## Brief change log
   
   - Support ForSt FlinkEnv
   - Introduce ForStStateBackend and ForStKeyedStateBackend
   - Support restoring and building ForSt
   - Introduce ResourceContainer for ForStStateBackend
   - Introduce Metrics for ForStStateBackend
   - Introduce ForSt related options
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added serveral Tests about configs, metrics, memory control, forst load
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org