[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216943#comment-16216943 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3442 I had a quick chat with Stephan about this. @StefanRRichter has an idea how to properly implement this. Closing this PR and unassigning the issue. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216944#comment-16216944 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce closed the pull request at: https://github.com/apache/flink/pull/3442 > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115911#comment-16115911 ] ASF GitHub Bot commented on FLINK-5778: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3442 What's the status of this PR @uce @tillrohrmann @StefanRRichter @StephanEwen ? > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000443#comment-16000443 ] Ufuk Celebi commented on FLINK-5778: No, I don't think so. I was waiting for FLINK-5823 (https://github.com/apache/flink/pull/3522) to be merged/rebased in order to rebase my PR on top of that one. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998468#comment-15998468 ] ASF GitHub Bot commented on FLINK-5778: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3442 Are we gonna get this in for 1.3? > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901282#comment-15901282 ] ASF GitHub Bot commented on FLINK-5778: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3442 Very much ok with me, my vote on this was actually on keeping old versioned code duplicated and immutable. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901273#comment-15901273 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3442 I agree with the equals/hashCode question. It has been introduced for the various involved classes by different people and not in this PR, so I think it's best handled as part of a different issue. Would you mind opening an issue for it? Regarding code refactoring: I was also skeptical about the value of refactoring this. I'm totally fine with keeping the code duplicated. @StefanRRichter Is that OK with you as well? > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901258#comment-15901258 ] ASF GitHub Bot commented on FLINK-5778: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3442 Another thought from the discussion with @StefanRRichter : You refactor a lot to not have duplicate code. While this is good in general, I am wondering if we should not actually duplicate the code here, because we want the `V1` serialization code and savepoint code to be "immutable", meaning it should not be affected by changes to the `V2` code. Having a copy makes sure no accidental changes are made to the `V1` code when modifying the `V2` code. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901247#comment-15901247 ] ASF GitHub Bot commented on FLINK-5778: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3442 Looks good in general. One thing that I stumbled across a lot in recent works on the checkpoints / savepoints is that they all implement `equals` and `hashCode` and delegate to the task states and handles, etc. Do we need to define semantic equality there? It seems fragile to me, because the state handles by themselves can in general not really make a good claim about equality. The `FileStateHandle` for example fails to define equality depending on whether there is a trailing '/' on a directory path or not. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897639#comment-15897639 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/3442 Thanks for your review Stefan! I addressed your comments, but only then realized that the restriction to relative file state handle is actually a problem for externalized checkpoints. :-( They possibly store their meta data somewhere else and not as part of the checkpoint files. For them it's OK to not be relocatable. Still, I agree with your comment that this should be more explicitly allowed or disallowed. I gues that we should explicitly distinguish between savepoints and externalized checkpoints for the meta data serialization. I hope that Stephan's refactorings of the checkpoint stream creation logic will help here. I'll leave this PR open with your suggested changes and adjust it after Stephan's PR. Then I can either piggyback on his changes or see whether I need to add the required distinction myself. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897365#comment-15897365 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3442#discussion_r104420638 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; + +/** + * A savepoint serializer that does not store absolute URIs for {@link FileStateHandle} + * instances, allowing users to relocate savepoints as long as the file structure + * within the savepoint directory stays the same. + */ +class SavepointV2Serializer extends AbstractSavepointSerializer { + + public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer(); + + private SavepointV2Serializer() { + } + + @Override + SavepointV2 createSavepoint(long checkpointId, Collection taskStates) { + return new SavepointV2(checkpointId, taskStates); + } + + @Override + void serializeFileStreamStateHandle(FileStateHandle fileStateHandle, Path basePath, DataOutputStream dos) throws IOException { + dos.writeLong(fileStateHandle.getStateSize()); + + Path child = fileStateHandle.getFilePath(); + Path relative = getRelativePath(basePath, child); + + if (relative != null) { + // This boolean is new in this version of the serializer + dos.writeBoolean(true); + dos.writeUTF(relative.toString()); + } else { + dos.writeBoolean(false); --- End diff -- The branch was meant to handle future changes, but you are right that this is not a good idea (for the reasons you mention). I will remove it. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897366#comment-15897366 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3442#discussion_r104420885 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/AbstractSavepointSerializer.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +/** + * Abstract Serializer for {@link Savepoint} instances. + * + * This is based on the {@link SavepointV1Serializer} of Flink 1.2.0 that + * makes sure no default Java serialization is used. + * + * The abstract class allows to overwrite the serialization behaviour for + * {@link FileStateHandle} instances. This is the only practical difference + * between Flink 1.2.x and versions >= Flink 1.3.0. + * + * This will probably be extended in ways that I cannot imagine at this point + * in time. If for whatever reason the abstract base class turns out to be a + * bad idea, feel free to change stuff. Right now, it's sole purpose is reducing + * code duplication between {@link SavepointV1Serializer} and {@link SavepointV2Serializer}. + */ +abstract class AbstractSavepointSerializer implements SavepointSerializer { --- End diff -- Agreed, let me change this. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897356#comment-15897356 ] ASF GitHub Bot commented on FLINK-5778: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3442#discussion_r104418918 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; + +/** + * A savepoint serializer that does not store absolute URIs for {@link FileStateHandle} + * instances, allowing users to relocate savepoints as long as the file structure + * within the savepoint directory stays the same. + */ +class SavepointV2Serializer extends AbstractSavepointSerializer { + + public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer(); + + private SavepointV2Serializer() { + } + + @Override + SavepointV2 createSavepoint(long checkpointId, Collection taskStates) { + return new SavepointV2(checkpointId, taskStates); + } + + @Override + void serializeFileStreamStateHandle(FileStateHandle fileStateHandle, Path basePath, DataOutputStream dos) throws IOException { + dos.writeLong(fileStateHandle.getStateSize()); + + Path child = fileStateHandle.getFilePath(); + Path relative = getRelativePath(basePath, child); + + if (relative != null) { + // This boolean is new in this version of the serializer + dos.writeBoolean(true); + dos.writeUTF(relative.toString()); + } else { + dos.writeBoolean(false); --- End diff -- I think this branch is never taken in the current implementation. If it would be taken, this would silently violate the user's assumption that she can simply move around the savepoint data. I suggest to remove this branch and stick to the actual case. If we ever want to support a different behaviour in the future, we can simply do so by coming up with another version of the serializer. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897357#comment-15897357 ] ASF GitHub Bot commented on FLINK-5778: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3442#discussion_r104399071 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/AbstractSavepointSerializer.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +/** + * Abstract Serializer for {@link Savepoint} instances. + * + * This is based on the {@link SavepointV1Serializer} of Flink 1.2.0 that + * makes sure no default Java serialization is used. + * + * The abstract class allows to overwrite the serialization behaviour for + * {@link FileStateHandle} instances. This is the only practical difference + * between Flink 1.2.x and versions >= Flink 1.3.0. + * + * This will probably be extended in ways that I cannot imagine at this point + * in time. If for whatever reason the abstract base class turns out to be a + * bad idea, feel free to change stuff. Right now, it's sole purpose is reducing + * code duplication between {@link SavepointV1Serializer} and {@link SavepointV2Serializer}. + */ +abstract class AbstractSavepointSerializer implements SavepointSerializer { --- End diff -- I would suggest to try using composition over inheritance in this case. From an OO point of view, I don't think that the different versions should be in a is-a-relationship hierarchy, but they exist on the same level. This means you are using inheritance just for code de-duplication, for which it is often inferior to composition. This makes the design more flexible and open for future changes. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890229#comment-15890229 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3442#discussion_r103689657 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; + +/** + * A savepoint serializer that does not store absolute URIs for {@link FileStateHandle} + * instances, allowing users to relocate savepoints as long as the file structure + * within the savepoint directory stays the same. + */ +class SavepointV2Serializer extends AbstractSavepointSerializer { + + public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer(); + + private SavepointV2Serializer() { + } + + @Override + SavepointV2 createSavepoint(long checkpointId, Collection taskStates) { + return new SavepointV2(checkpointId, taskStates); + } + + @Override + void serializeFileStreamStateHandle(FileStateHandle fileStateHandle, Path basePath, DataOutputStream dos) throws IOException { --- End diff -- Here is the new serialization behaviour > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890222#comment-15890222 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3442#discussion_r103689471 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/AbstractSavepointSerializer.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +/** + * Abstract Serializer for {@link Savepoint} instances. + * + * This is based on the {@link SavepointV1Serializer} of Flink 1.2.0 that + * makes sure no default Java serialization is used. + * + * The abstract class allows to overwrite the serialization behaviour for + * {@link FileStateHandle} instances. This is the only practical difference + * between Flink 1.2.x and versions >= Flink 1.3.0. + * + * This will probably be extended in ways that I cannot imagine at this point + * in time. If for whatever reason the abstract base class turns out to be a + * bad idea, feel free to change stuff. Right now, it's sole purpose is reducing + * code duplication between {@link SavepointV1Serializer} and {@link SavepointV2Serializer}. + */ +abstract class AbstractSavepointSerializer implements SavepointSerializer { --- End diff -- All of this is shared between V1 and V2 savepoints. I've moved it here and allowed to overwrite the file state handle serialization behaviour. @StefanRRichter, @StephanEwen The usefulness of this depends on how many changes we expect in the future. If we expect many, I think it is better to actually duplicate the code instead of moving it to an abstract class. For now, I'm slightly leaning towards not expecting many changes and hence moved it to the abstract class. In any case, moving back with this is a very trivial refactoring if you want the code to be rather copied. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890215#comment-15890215 ] ASF GitHub Bot commented on FLINK-5778: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3442#discussion_r103688876 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/AbstractSavepoint.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.savepoint; + +import java.util.Collection; +import java.util.Objects; +import org.apache.flink.runtime.checkpoint.TaskState; +import org.apache.flink.util.Preconditions; + +/** + * A base savepoint class implementing the simple accessors of the Savepoint + * interface. + */ +abstract class AbstractSavepoint implements Savepoint { --- End diff -- This code was the same for both `SavepointV1` and `SavepointV2`. Therefore I've moved it here. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890214#comment-15890214 ] ASF GitHub Bot commented on FLINK-5778: --- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/3442 [FLINK-5778] [savepoints] Add savepoint serializer with relative file path serialization This adds a new savepoint version, `SavepointV2`. The corresponding `SavepointV2Serializer` is the same as our current `SavepointV1Serializer` except that `FileStateHandle` instances are serialized with their file path relative to the savepoint base path. As an example imagine a savepoint in directory `hdfs:///path/to/savepoint-directory` with this data file: ``` hdfs:///path/to/savepoint-directory/_metadata hdfs:///path/to/savepoint-directory/data-X hdfs:///path/to/savepoint-directory/data-Y ``` Previously, the complete file path was stored. With this PR, we only store `data-X` for file state handles and reconstruct the complete path from the savepoint directory on restore. This enables us to move the savepoint directory around. The only requirement is that the layout within the savepoint directory does not change. I think this is a reasonable restriction. In addition to the added tests, I've tested this manually by triggering savepoints, moving the savepoint around in the local file system as well as to HDFS and restoring from it. The code between `SavepointV1` and `SavepointV2` and the respective serializers is mostly shared. Therefore, I've moved the base logic out to an abstract `AbstractSavepoint` and `AbstractSavepointSerializer`. The migration story is that you can resume old savepoints as before and all newly triggered savepoints will be V2 savepoints that serialize file state handles with their relative path. You can also resume with `1.3-SNAPSHOT` savepoint without any issues. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 5778-relocatable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3442.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3442 commit 1bc3b3bff1b33eb204e8b9d4cd9589105dd60466 Author: Ufuk CelebiDate: 2017-02-28T21:36:24Z [FLINK-5778] [savepoints] Add savepoint serializer with relative file path serializaton > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15884816#comment-15884816 ] Ufuk Celebi commented on FLINK-5778: I think your suggestion makes a lot of sense. I will do it as you propose and handle this at the savepoint serializer. > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath
[ https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866466#comment-15866466 ] Stephan Ewen commented on FLINK-5778: - I am not sure about this design. State handle immutability is something I would really like to preserve. How about making all FileStateHandles consist of a base path and a file path? - When serializing, the SavepointSerializer stores only the file path - When deserializing, the new state handles are constructed from the root path of the checkpoint, and the name in the savepoint > Split FileStateHandle into fileName and basePath > > > Key: FLINK-5778 > URL: https://issues.apache.org/jira/browse/FLINK-5778 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > Store the statePath as a basePath and a fileName and allow to overwrite the > basePath. We cannot overwrite the base path as long as the state handle is > still in flight and not persisted. Otherwise we risk a resource leak. > We need this in order to be able to relocate savepoints. > {code} > interface RelativeBaseLocationStreamStateHandle { >void clearBaseLocation(); >void setBaseLocation(String baseLocation); > } > {code} > FileStateHandle should implement this and the SavepointSerializer should > forward the calls when a savepoint is stored or loaded, clear before store > and set after load. -- This message was sent by Atlassian JIRA (v6.3.15#6346)