Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia merged PR #24513: URL: https://github.com/apache/flink/pull/24513 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on PR #24513: URL: https://github.com/apache/flink/pull/24513#issuecomment-2019523721 @Zakelly Thanks for the review and suggestions, I have addressed the comments and squashed commits, please take a look if you're free, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
Zakelly commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1538661895 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java: ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryKey; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; + +/** + * A {@link OperatorStreamStateHandle} that works for file merging checkpoints. + * + * Operator states are stored in `taskownd/` dir when file merging is enabled. When an operator + * state dir is not referenced by any checkpoint, {@link SharedStateRegistry} will discard it. The + * shared subtask dir of fire merging is also tracked by {@link + * FileMergingOperatorStreamStateHandle}. + * + * The shared subtask dir of file merging is created when task initialization, which will be + * discarded when no checkpoint refer to it. + */ +public class FileMergingOperatorStreamStateHandle extends OperatorStreamStateHandle +implements CompositeStateHandle { + +private static final long serialVersionUID = 1L; +private static final Logger LOG = + LoggerFactory.getLogger(FileMergingOperatorStreamStateHandle.class); + +/** The directory handle of file merging under 'taskowed/', one for each job. */ +private final DirectoryStreamStateHandle taskownedDirHandle; + +/** + * The directory handle of file merging under 'shared/', one for each subtask. + * + * @see {@link org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager} for + * the layout of file merging checkpoint directory. + */ +private final DirectoryStreamStateHandle sharedDirHandle; + +private transient SharedStateRegistry sharedStateRegistry; + +public FileMergingOperatorStreamStateHandle( +DirectoryStreamStateHandle taskownedDirHandle, Review Comment: How about name it `taskOwnedDirHandle`, I noticed variable names elsewhere are mostly `taskOwned` with a capital 'O' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1538563101 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.util.Collections; +import java.util.Map; + +/** + * An empty {@link FileMergingOperatorStreamStateHandle} that is only used as a placeholder to + * prevent file merging directory from being deleted. + */ +public class EmptyFileMergingOperatorStreamStateHandle Review Comment: `OperatorStateHandle` requires the implementation of `getStateNameToPartitionOffsets()` and `getDelegateStateHandle()`. If `EmptyFileMergingOperatorStreamStateHandle ` directly inherits `OperatorStateHandle `, we need to repeatedly implement the methods already in `FileMergingOperatorStreamStateHandle`. I lean towards maintaining the current implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1538560216 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java: ## @@ -204,7 +217,17 @@ public SnapshotResultSupplier asyncSnapshot( if (snapshotCloseableRegistry.unregisterCloseable(localOut)) { StreamStateHandle stateHandle = localOut.closeAndGetHandle(); if (stateHandle != null) { Review Comment: `SnapshotResult.of(null)` will be returned. Theoretically, this situation cannot happen, because `registeredOperatorStatesDeepCopies` and `registeredBroadcastStatesDeepCopies` are checked at the beginning. At thisline, one of `OperatorStates` or `BroadcastStates` is definitely not empty, and there must be content written to the output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
Zakelly commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1538512841 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java: ## @@ -204,7 +217,17 @@ public SnapshotResultSupplier asyncSnapshot( if (snapshotCloseableRegistry.unregisterCloseable(localOut)) { StreamStateHandle stateHandle = localOut.closeAndGetHandle(); if (stateHandle != null) { Review Comment: What if the `stateHandle == null` ? ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.util.Collections; +import java.util.Map; + +/** + * An empty {@link FileMergingOperatorStreamStateHandle} that is only used as a placeholder to + * prevent file merging directory from being deleted. + */ +public class EmptyFileMergingOperatorStreamStateHandle Review Comment: I'm not sure if `EmptyFileMergingOperatorStreamStateHandle` should extend the `FileMergingOperatorStreamStateHandle` since it does not need the delegating functionality of `OperatorStreamStateHandle`. Maybe directly built on top of `OperatorStateHandle` is enough? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1537383062 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -278,7 +300,11 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle( returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile); return new SegmentFileStateHandle( -physicalFile.getFilePath(), startPos, stateSize, scope); +getManagedDirStateHandle(subtaskKey, scope), Review Comment: Good catch, I moved the subtask dir of shared state to `FileMergingOperatorStreamStateHandle`, directory will be registered even if there is no state, so the directory can be deleted by JM in claim mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
Zakelly commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1536996692 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -278,7 +300,11 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle( returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile); return new SegmentFileStateHandle( -physicalFile.getFilePath(), startPos, stateSize, scope); +getManagedDirStateHandle(subtaskKey, scope), Review Comment: Is it possible for some cp happened to be with no state, and the directory is deleted by JM? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
Zakelly commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1536996545 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -278,7 +300,11 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle( returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile); return new SegmentFileStateHandle( -physicalFile.getFilePath(), startPos, stateSize, scope); +getManagedDirStateHandle(subtaskKey, scope), Review Comment: Is it possible for some cp happened to be with no state, and the directory is deleted by JM? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
Zakelly commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1536996545 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ## @@ -278,7 +300,11 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle( returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile); return new SegmentFileStateHandle( -physicalFile.getFilePath(), startPos, stateSize, scope); +getManagedDirStateHandle(subtaskKey, scope), Review Comment: Is it possible for some cp happened to be with no state, and the directory is deleted by JM? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on PR #24513: URL: https://github.com/apache/flink/pull/24513#issuecomment-2014755359 @Zakelly @masteryhx Thanks for the detailed review, I update the PR according to your suggestion. Please take a look again. > I'm wandering is it possible to add DirectoryStreamStateHandle at some point before reporting to JM, instead of providing it within each SegmentFileStateHandle ? Yes, I've stored the `DirectoryStreamStateHandle` of each subtask in `FileMergingSnapshotManager`. Now, they are only created once when the job starts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1535339503 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java: ## @@ -418,6 +430,62 @@ void testSnapshotEmpty() throws Exception { assertThat(stateHandle).isNull(); } +@Test +void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws Exception { Review Comment: I added `testSegmentStateHandleStateRegister` and `testFireMergingOperatorStateRegister` in `SharedStateRegistryTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1535338210 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.DirectoryStateHandle; +import org.apache.flink.runtime.state.PhysicalStateHandleID; +import org.apache.flink.runtime.state.SharedStateRegistryKey; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.FileUtils; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Optional; + +/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */ +public class DirectoryStreamStateHandle extends DirectoryStateHandle implements StreamStateHandle { + +private static final long serialVersionUID = -6453596108675892492L; + +public DirectoryStreamStateHandle(@Nonnull Path directory, long directorySize) { +super(directory, directorySize); +} + +@Override +public FSDataInputStream openInputStream() { +throw new UnsupportedOperationException(); +} + +@Override +public Optional asBytesIfInMemory() { +return Optional.empty(); +} + +@Override +public PhysicalStateHandleID getStreamStateHandleID() { +return new PhysicalStateHandleID(getDirectory().toString()); +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} + +DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o; + +return getDirectory().equals(that.getDirectory()); +} + +@Override +public String toString() { +return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + '}'; +} + +public static DirectoryStreamStateHandle forPathWithSize(@Nonnull Path directory) { +long size; +try { +size = FileUtils.getDirectoryFilesSize(directory); +} catch (IOException e) { +size = 0L; +} +return new DirectoryStreamStateHandle(directory, size); +} + +public static SharedStateRegistryKey createStateRegistryKey( Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1535337848 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java: ## @@ -62,19 +77,58 @@ public class SegmentFileStateHandle implements StreamStateHandle { * @param scope The state's scope, whether it is exclusive or shared. */ public SegmentFileStateHandle( -Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { +Path directoryPath, +Path filePath, +long startPos, +long stateSize, +CheckpointedStateScope scope) { this.filePath = filePath; this.stateSize = stateSize; this.startPos = startPos; this.scope = scope; +this.directoryStateHandle = +DirectoryStreamStateHandle.forPathWithSize( +new File(directoryPath.getPath()).toPath()); Review Comment: , I've stored the `DirectoryStreamStateHandle` of each subtask in `FileMergingSnapshotManager`. Now, they are only created once when job start. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
masteryhx commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r155369 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java: ## @@ -418,6 +430,62 @@ void testSnapshotEmpty() throws Exception { assertThat(stateHandle).isNull(); } +@Test +void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws Exception { Review Comment: Could we also test the registery of the new handle ? Or test that the subsumed checkpoint will discard correctly. ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.DirectoryStateHandle; +import org.apache.flink.runtime.state.PhysicalStateHandleID; +import org.apache.flink.runtime.state.SharedStateRegistryKey; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.FileUtils; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Optional; + +/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */ +public class DirectoryStreamStateHandle extends DirectoryStateHandle implements StreamStateHandle { + +private static final long serialVersionUID = -6453596108675892492L; + +public DirectoryStreamStateHandle(@Nonnull Path directory, long directorySize) { +super(directory, directorySize); +} + +@Override +public FSDataInputStream openInputStream() { +throw new UnsupportedOperationException(); +} + +@Override +public Optional asBytesIfInMemory() { +return Optional.empty(); +} + +@Override +public PhysicalStateHandleID getStreamStateHandleID() { +return new PhysicalStateHandleID(getDirectory().toString()); +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} + +DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o; + +return getDirectory().equals(that.getDirectory()); +} + +@Override +public String toString() { +return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + '}'; +} + +public static DirectoryStreamStateHandle forPathWithSize(@Nonnull Path directory) { +long size; +try { +size = FileUtils.getDirectoryFilesSize(directory); +} catch (IOException e) { +size = 0L; +} +return new DirectoryStreamStateHandle(directory, size); +} + +public static SharedStateRegistryKey createStateRegistryKey( Review Comment: Could this be a member function ? ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java: ## @@ -62,19 +77,58 @@ public class SegmentFileStateHandle implements StreamStateHandle { * @param scope The state's scope, whether it is exclusive or shared. */ public SegmentFileStateHandle( -Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { +Path directoryPath, +Path filePath, +long startPos, +long stateSize, +CheckpointedStateScope scope) { this.filePath = filePath; this.stateSize = stateSize; this.startPos = startPos; this.scope = scope; +this.directoryStateHandle = +DirectoryStreamStateHandle.forPathWithSize( +new File(directoryPath.getPath()).toPath()); Review Comment: +1, At least we should not calculate for every SegmentFileStateHandle in the same directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
Zakelly commented on code in PR #24513: URL: https://github.com/apache/flink/pull/24513#discussion_r1533201932 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java: ## @@ -62,19 +77,58 @@ public class SegmentFileStateHandle implements StreamStateHandle { * @param scope The state's scope, whether it is exclusive or shared. */ public SegmentFileStateHandle( -Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { +Path directoryPath, +Path filePath, +long startPos, +long stateSize, +CheckpointedStateScope scope) { this.filePath = filePath; this.stateSize = stateSize; this.startPos = startPos; this.scope = scope; +this.directoryStateHandle = +DirectoryStreamStateHandle.forPathWithSize( +new File(directoryPath.getPath()).toPath()); Review Comment: Will `forPathWithSize` be inefficiency? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34668][checkpoint] Report state handle of file merging directory to JM [flink]
fredia commented on PR #24513: URL: https://github.com/apache/flink/pull/24513#issuecomment-2003585066 @Zakelly @masteryhx Would you like to take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org