Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-26 Thread via GitHub


fredia merged PR #24513:
URL: https://github.com/apache/flink/pull/24513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-26 Thread via GitHub


fredia commented on PR #24513:
URL: https://github.com/apache/flink/pull/24513#issuecomment-2019523721

   @Zakelly Thanks for the review and suggestions, I have addressed the 
comments and squashed commits, please take a look if you're free, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-26 Thread via GitHub


Zakelly commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1538661895


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java:
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link OperatorStreamStateHandle} that works for file merging checkpoints.
+ *
+ * Operator states are stored in `taskownd/` dir when file merging is 
enabled. When an operator
+ * state dir is not referenced by any checkpoint, {@link SharedStateRegistry} 
will discard it. The
+ * shared subtask dir of fire merging is also tracked by {@link
+ * FileMergingOperatorStreamStateHandle}.
+ *
+ * The shared subtask dir of file merging is created when task 
initialization, which will be
+ * discarded when no checkpoint refer to it.
+ */
+public class FileMergingOperatorStreamStateHandle extends 
OperatorStreamStateHandle
+implements CompositeStateHandle {
+
+private static final long serialVersionUID = 1L;
+private static final Logger LOG =
+
LoggerFactory.getLogger(FileMergingOperatorStreamStateHandle.class);
+
+/** The directory handle of file merging under 'taskowed/', one for each 
job. */
+private final DirectoryStreamStateHandle taskownedDirHandle;
+
+/**
+ * The directory handle of file merging under 'shared/', one for each 
subtask.
+ *
+ * @see {@link 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager} for
+ * the layout of file merging checkpoint directory.
+ */
+private final DirectoryStreamStateHandle sharedDirHandle;
+
+private transient SharedStateRegistry sharedStateRegistry;
+
+public FileMergingOperatorStreamStateHandle(
+DirectoryStreamStateHandle taskownedDirHandle,

Review Comment:
   How about name it `taskOwnedDirHandle`, I noticed variable names elsewhere 
are mostly `taskOwned` with a capital 'O'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-25 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1538563101


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * An empty {@link FileMergingOperatorStreamStateHandle} that is only used as 
a placeholder to
+ * prevent file merging directory from being deleted.
+ */
+public class EmptyFileMergingOperatorStreamStateHandle

Review Comment:
   `OperatorStateHandle`  requires the implementation of 
`getStateNameToPartitionOffsets()` and `getDelegateStateHandle()`. If 
`EmptyFileMergingOperatorStreamStateHandle ` directly inherits 
`OperatorStateHandle `, we need to repeatedly implement the methods already in 
`FileMergingOperatorStreamStateHandle`. I lean towards maintaining the current 
implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-25 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1538560216


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java:
##
@@ -204,7 +217,17 @@ public SnapshotResultSupplier 
asyncSnapshot(
 if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
 StreamStateHandle stateHandle = localOut.closeAndGetHandle();
 if (stateHandle != null) {

Review Comment:
   `SnapshotResult.of(null)` will be returned.
   
   Theoretically, this situation cannot happen, because 
`registeredOperatorStatesDeepCopies` and `registeredBroadcastStatesDeepCopies` 
are checked at the beginning. At thisline, one of `OperatorStates` or 
`BroadcastStates` is definitely not empty, and there must be content written to 
the output.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-25 Thread via GitHub


Zakelly commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1538512841


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java:
##
@@ -204,7 +217,17 @@ public SnapshotResultSupplier 
asyncSnapshot(
 if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
 StreamStateHandle stateHandle = localOut.closeAndGetHandle();
 if (stateHandle != null) {

Review Comment:
   What if the `stateHandle == null` ?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * An empty {@link FileMergingOperatorStreamStateHandle} that is only used as 
a placeholder to
+ * prevent file merging directory from being deleted.
+ */
+public class EmptyFileMergingOperatorStreamStateHandle

Review Comment:
   I'm not sure if `EmptyFileMergingOperatorStreamStateHandle` should extend 
the `FileMergingOperatorStreamStateHandle` since it does not need the 
delegating functionality of `OperatorStreamStateHandle`. Maybe directly built 
on top of `OperatorStateHandle` is enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-25 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1537383062


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -278,7 +300,11 @@ public SegmentFileStateHandle 
closeStreamAndCreateStateHandle(
 returnPhysicalFileForNextReuse(subtaskKey, 
checkpointId, physicalFile);
 
 return new SegmentFileStateHandle(
-physicalFile.getFilePath(), startPos, 
stateSize, scope);
+getManagedDirStateHandle(subtaskKey, 
scope),

Review Comment:
   Good catch, I moved the subtask dir of shared state to 
`FileMergingOperatorStreamStateHandle`, directory will be registered even if 
there is no state, so the directory can be deleted by JM in claim mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-24 Thread via GitHub


Zakelly commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1536996692


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -278,7 +300,11 @@ public SegmentFileStateHandle 
closeStreamAndCreateStateHandle(
 returnPhysicalFileForNextReuse(subtaskKey, 
checkpointId, physicalFile);
 
 return new SegmentFileStateHandle(
-physicalFile.getFilePath(), startPos, 
stateSize, scope);
+getManagedDirStateHandle(subtaskKey, 
scope),

Review Comment:
   Is it possible for some cp happened to be with no state, and the directory 
is deleted by JM?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-24 Thread via GitHub


Zakelly commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1536996545


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -278,7 +300,11 @@ public SegmentFileStateHandle 
closeStreamAndCreateStateHandle(
 returnPhysicalFileForNextReuse(subtaskKey, 
checkpointId, physicalFile);
 
 return new SegmentFileStateHandle(
-physicalFile.getFilePath(), startPos, 
stateSize, scope);
+getManagedDirStateHandle(subtaskKey, 
scope),

Review Comment:
   Is it possible for some cp happened to be with no state, and the directory 
is deleted by JM?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-24 Thread via GitHub


Zakelly commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1536996545


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##
@@ -278,7 +300,11 @@ public SegmentFileStateHandle 
closeStreamAndCreateStateHandle(
 returnPhysicalFileForNextReuse(subtaskKey, 
checkpointId, physicalFile);
 
 return new SegmentFileStateHandle(
-physicalFile.getFilePath(), startPos, 
stateSize, scope);
+getManagedDirStateHandle(subtaskKey, 
scope),

Review Comment:
   Is it possible for some cp happened to be with no state, and the directory 
is deleted by JM?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-22 Thread via GitHub


fredia commented on PR #24513:
URL: https://github.com/apache/flink/pull/24513#issuecomment-2014755359

   @Zakelly @masteryhx  Thanks for the detailed review, I update the PR 
according to your suggestion. Please take a look again.
   
   > I'm wandering is it possible to add DirectoryStreamStateHandle at some 
point before reporting to JM, instead of providing it within each 
SegmentFileStateHandle ?
   
   Yes, I've stored the `DirectoryStreamStateHandle` of each subtask in 
`FileMergingSnapshotManager`. Now, they are only created once when the job 
starts.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-22 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1535339503


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java:
##
@@ -418,6 +430,62 @@ void testSnapshotEmpty() throws Exception {
 assertThat(stateHandle).isNull();
 }
 
+@Test
+void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws 
Exception {

Review Comment:
   I added `testSegmentStateHandleStateRegister` and 
`testFireMergingOperatorStateRegister` in `SharedStateRegistryTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-22 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1535338210


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Optional;
+
+/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
+public class DirectoryStreamStateHandle extends DirectoryStateHandle 
implements StreamStateHandle {
+
+private static final long serialVersionUID = -6453596108675892492L;
+
+public DirectoryStreamStateHandle(@Nonnull Path directory, long 
directorySize) {
+super(directory, directorySize);
+}
+
+@Override
+public FSDataInputStream openInputStream() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public Optional asBytesIfInMemory() {
+return Optional.empty();
+}
+
+@Override
+public PhysicalStateHandleID getStreamStateHandleID() {
+return new PhysicalStateHandleID(getDirectory().toString());
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+
+DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o;
+
+return getDirectory().equals(that.getDirectory());
+}
+
+@Override
+public String toString() {
+return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + 
'}';
+}
+
+public static DirectoryStreamStateHandle forPathWithSize(@Nonnull Path 
directory) {
+long size;
+try {
+size = FileUtils.getDirectoryFilesSize(directory);
+} catch (IOException e) {
+size = 0L;
+}
+return new DirectoryStreamStateHandle(directory, size);
+}
+
+public static SharedStateRegistryKey createStateRegistryKey(

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-22 Thread via GitHub


fredia commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1535337848


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java:
##
@@ -62,19 +77,58 @@ public class SegmentFileStateHandle implements 
StreamStateHandle {
  * @param scope The state's scope, whether it is exclusive or shared.
  */
 public SegmentFileStateHandle(
-Path filePath, long startPos, long stateSize, 
CheckpointedStateScope scope) {
+Path directoryPath,
+Path filePath,
+long startPos,
+long stateSize,
+CheckpointedStateScope scope) {
 this.filePath = filePath;
 this.stateSize = stateSize;
 this.startPos = startPos;
 this.scope = scope;
+this.directoryStateHandle =
+DirectoryStreamStateHandle.forPathWithSize(
+new File(directoryPath.getPath()).toPath());

Review Comment:
   , I've stored the `DirectoryStreamStateHandle` of each subtask in 
`FileMergingSnapshotManager`. Now, they are only created once when job start.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-21 Thread via GitHub


masteryhx commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r155369


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java:
##
@@ -418,6 +430,62 @@ void testSnapshotEmpty() throws Exception {
 assertThat(stateHandle).isNull();
 }
 
+@Test
+void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws 
Exception {

Review Comment:
   Could we also test the registery of the new handle ?
   Or test that the subsumed checkpoint will discard correctly.



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Optional;
+
+/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
+public class DirectoryStreamStateHandle extends DirectoryStateHandle 
implements StreamStateHandle {
+
+private static final long serialVersionUID = -6453596108675892492L;
+
+public DirectoryStreamStateHandle(@Nonnull Path directory, long 
directorySize) {
+super(directory, directorySize);
+}
+
+@Override
+public FSDataInputStream openInputStream() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public Optional asBytesIfInMemory() {
+return Optional.empty();
+}
+
+@Override
+public PhysicalStateHandleID getStreamStateHandleID() {
+return new PhysicalStateHandleID(getDirectory().toString());
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+
+DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o;
+
+return getDirectory().equals(that.getDirectory());
+}
+
+@Override
+public String toString() {
+return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + 
'}';
+}
+
+public static DirectoryStreamStateHandle forPathWithSize(@Nonnull Path 
directory) {
+long size;
+try {
+size = FileUtils.getDirectoryFilesSize(directory);
+} catch (IOException e) {
+size = 0L;
+}
+return new DirectoryStreamStateHandle(directory, size);
+}
+
+public static SharedStateRegistryKey createStateRegistryKey(

Review Comment:
   Could this be a member function ?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java:
##
@@ -62,19 +77,58 @@ public class SegmentFileStateHandle implements 
StreamStateHandle {
  * @param scope The state's scope, whether it is exclusive or shared.
  */
 public SegmentFileStateHandle(
-Path filePath, long startPos, long stateSize, 
CheckpointedStateScope scope) {
+Path directoryPath,
+Path filePath,
+long startPos,
+long stateSize,
+CheckpointedStateScope scope) {
 this.filePath = filePath;
 this.stateSize = stateSize;
 this.startPos = startPos;
 this.scope = scope;
+this.directoryStateHandle =
+DirectoryStreamStateHandle.forPathWithSize(
+new File(directoryPath.getPath()).toPath());

Review Comment:
   +1, At least we should not calculate for every SegmentFileStateHandle in the 
same directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-20 Thread via GitHub


Zakelly commented on code in PR #24513:
URL: https://github.com/apache/flink/pull/24513#discussion_r1533201932


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java:
##
@@ -62,19 +77,58 @@ public class SegmentFileStateHandle implements 
StreamStateHandle {
  * @param scope The state's scope, whether it is exclusive or shared.
  */
 public SegmentFileStateHandle(
-Path filePath, long startPos, long stateSize, 
CheckpointedStateScope scope) {
+Path directoryPath,
+Path filePath,
+long startPos,
+long stateSize,
+CheckpointedStateScope scope) {
 this.filePath = filePath;
 this.stateSize = stateSize;
 this.startPos = startPos;
 this.scope = scope;
+this.directoryStateHandle =
+DirectoryStreamStateHandle.forPathWithSize(
+new File(directoryPath.getPath()).toPath());

Review Comment:
   Will `forPathWithSize` be inefficiency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]

2024-03-18 Thread via GitHub


fredia commented on PR #24513:
URL: https://github.com/apache/flink/pull/24513#issuecomment-2003585066

   @Zakelly @masteryhx Would you like to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org