[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216943#comment-16216943
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3442
  
I had a quick chat with Stephan about this. @StefanRRichter has an idea how 
to properly implement this. Closing this PR and unassigning the issue.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16216944#comment-16216944
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce closed the pull request at:

https://github.com/apache/flink/pull/3442


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16115911#comment-16115911
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3442
  
What's the status of this PR @uce @tillrohrmann @StefanRRichter 
@StephanEwen ?


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-05-08 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000443#comment-16000443
 ] 

Ufuk Celebi commented on FLINK-5778:


No, I don't think so. I was waiting for FLINK-5823 
(https://github.com/apache/flink/pull/3522) to be merged/rebased in order to 
rebase my PR on top of that one.

> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998468#comment-15998468
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3442
  
Are we gonna get this in for 1.3?


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901282#comment-15901282
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3442
  
Very much ok with me, my vote on this was actually on keeping old versioned 
code duplicated and immutable.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901273#comment-15901273
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3442
  
I agree with the equals/hashCode question. It has been introduced for the 
various involved classes by different people and not in this PR, so I think 
it's best handled as part of a different issue. Would you mind opening an issue 
for it? 

Regarding code refactoring: I was also skeptical about the value of 
refactoring this. I'm totally fine with keeping the code duplicated. 
@StefanRRichter Is that OK with you as well?



> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901258#comment-15901258
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3442
  
Another thought from the discussion with @StefanRRichter :
You refactor a lot to not have duplicate code. While this is good in 
general, I am wondering if we should not actually duplicate the code here, 
because we want the `V1` serialization code and savepoint code to be 
"immutable", meaning it should not be affected by changes to the `V2` code. 
Having a copy makes sure no accidental changes are made to the `V1` code when 
modifying the `V2` code.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901247#comment-15901247
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3442
  
Looks good in general.

One thing that I stumbled across a lot in recent works on the checkpoints / 
savepoints is that they all implement `equals` and `hashCode` and delegate to 
the task states and handles, etc. 

Do we need to define semantic equality there? It seems fragile to me, 
because the state handles by themselves can in general not really make a good 
claim about equality. The `FileStateHandle` for example fails to define 
equality depending on whether there is a trailing '/' on a directory path or 
not.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897639#comment-15897639
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3442
  
Thanks for your review Stefan! I addressed your comments, but only then 
realized that the restriction to relative file state handle is actually a 
problem for externalized checkpoints. :-( They possibly store their meta data 
somewhere else and not as part of the checkpoint files. For them it's OK to not 
be relocatable. Still, I agree with your comment that this should be more 
explicitly allowed or disallowed. I gues that we should explicitly distinguish 
between savepoints and externalized checkpoints for the meta data 
serialization. I hope that Stephan's refactorings of the checkpoint stream 
creation logic will help here. I'll leave this PR open with your suggested 
changes and adjust it after Stephan's PR. Then I can either piggyback on his 
changes or see whether I need to add the required distinction myself.



> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897365#comment-15897365
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3442#discussion_r104420638
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
+/**
+ * A savepoint serializer that does not store absolute URIs for {@link 
FileStateHandle}
+ * instances, allowing users to relocate savepoints as long as the file 
structure
+ * within the savepoint directory stays the same.
+ */
+class SavepointV2Serializer extends 
AbstractSavepointSerializer {
+
+   public static final SavepointV2Serializer INSTANCE = new 
SavepointV2Serializer();
+
+   private SavepointV2Serializer() {
+   }
+
+   @Override
+   SavepointV2 createSavepoint(long checkpointId, Collection 
taskStates) {
+   return new SavepointV2(checkpointId, taskStates);
+   }
+
+   @Override
+   void serializeFileStreamStateHandle(FileStateHandle fileStateHandle, 
Path basePath, DataOutputStream dos) throws IOException {
+   dos.writeLong(fileStateHandle.getStateSize());
+
+   Path child = fileStateHandle.getFilePath();
+   Path relative = getRelativePath(basePath, child);
+
+   if (relative != null) {
+   // This boolean is new in this version of the serializer
+   dos.writeBoolean(true);
+   dos.writeUTF(relative.toString());
+   } else {
+   dos.writeBoolean(false);
--- End diff --

The branch was meant to handle future changes, but you are right that this 
is not a good idea (for the reasons you mention). I will remove it.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897366#comment-15897366
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3442#discussion_r104420885
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/AbstractSavepointSerializer.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+/**
+ * Abstract Serializer for {@link Savepoint} instances.
+ *
+ * This is based on the {@link SavepointV1Serializer} of Flink 1.2.0 
that
+ * makes sure no default Java serialization is used.
+ *
+ * The abstract class allows to overwrite the serialization behaviour 
for
+ * {@link FileStateHandle} instances. This is the only practical difference
+ * between Flink 1.2.x and versions >= Flink 1.3.0.
+ *
+ * This will probably be extended in ways that I cannot imagine at this 
point
+ * in time. If for whatever reason the abstract base class turns out to be 
a
+ * bad idea, feel free to change stuff. Right now, it's sole purpose is 
reducing
+ * code duplication between {@link SavepointV1Serializer} and {@link 
SavepointV2Serializer}.
+ */
+abstract class AbstractSavepointSerializer implements 
SavepointSerializer {
--- End diff --

Agreed, let me change this.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897356#comment-15897356
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3442#discussion_r104418918
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
+/**
+ * A savepoint serializer that does not store absolute URIs for {@link 
FileStateHandle}
+ * instances, allowing users to relocate savepoints as long as the file 
structure
+ * within the savepoint directory stays the same.
+ */
+class SavepointV2Serializer extends 
AbstractSavepointSerializer {
+
+   public static final SavepointV2Serializer INSTANCE = new 
SavepointV2Serializer();
+
+   private SavepointV2Serializer() {
+   }
+
+   @Override
+   SavepointV2 createSavepoint(long checkpointId, Collection 
taskStates) {
+   return new SavepointV2(checkpointId, taskStates);
+   }
+
+   @Override
+   void serializeFileStreamStateHandle(FileStateHandle fileStateHandle, 
Path basePath, DataOutputStream dos) throws IOException {
+   dos.writeLong(fileStateHandle.getStateSize());
+
+   Path child = fileStateHandle.getFilePath();
+   Path relative = getRelativePath(basePath, child);
+
+   if (relative != null) {
+   // This boolean is new in this version of the serializer
+   dos.writeBoolean(true);
+   dos.writeUTF(relative.toString());
+   } else {
+   dos.writeBoolean(false);
--- End diff --

I think this branch is never taken in the current implementation. If it 
would be taken, this would silently violate the user's assumption that she can 
simply move around the savepoint data. I suggest to remove this branch and 
stick to the actual case. If we ever want to support a different behaviour in 
the future, we can simply do so by coming up with another version of the 
serializer.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897357#comment-15897357
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3442#discussion_r104399071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/AbstractSavepointSerializer.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+/**
+ * Abstract Serializer for {@link Savepoint} instances.
+ *
+ * This is based on the {@link SavepointV1Serializer} of Flink 1.2.0 
that
+ * makes sure no default Java serialization is used.
+ *
+ * The abstract class allows to overwrite the serialization behaviour 
for
+ * {@link FileStateHandle} instances. This is the only practical difference
+ * between Flink 1.2.x and versions >= Flink 1.3.0.
+ *
+ * This will probably be extended in ways that I cannot imagine at this 
point
+ * in time. If for whatever reason the abstract base class turns out to be 
a
+ * bad idea, feel free to change stuff. Right now, it's sole purpose is 
reducing
+ * code duplication between {@link SavepointV1Serializer} and {@link 
SavepointV2Serializer}.
+ */
+abstract class AbstractSavepointSerializer implements 
SavepointSerializer {
--- End diff --

I would suggest to try using composition over inheritance in this case. 
From an OO point of view, I don't think that the different versions should be 
in a is-a-relationship hierarchy, but they exist on the same level. This means 
you are using inheritance just for code de-duplication, for which it is often 
inferior to composition. This makes the design more flexible and open for 
future changes.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890229#comment-15890229
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3442#discussion_r103689657
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
+/**
+ * A savepoint serializer that does not store absolute URIs for {@link 
FileStateHandle}
+ * instances, allowing users to relocate savepoints as long as the file 
structure
+ * within the savepoint directory stays the same.
+ */
+class SavepointV2Serializer extends 
AbstractSavepointSerializer {
+
+   public static final SavepointV2Serializer INSTANCE = new 
SavepointV2Serializer();
+
+   private SavepointV2Serializer() {
+   }
+
+   @Override
+   SavepointV2 createSavepoint(long checkpointId, Collection 
taskStates) {
+   return new SavepointV2(checkpointId, taskStates);
+   }
+
+   @Override
+   void serializeFileStreamStateHandle(FileStateHandle fileStateHandle, 
Path basePath, DataOutputStream dos) throws IOException {
--- End diff --

Here is the new serialization behaviour


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890222#comment-15890222
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3442#discussion_r103689471
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/AbstractSavepointSerializer.java
 ---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+/**
+ * Abstract Serializer for {@link Savepoint} instances.
+ *
+ * This is based on the {@link SavepointV1Serializer} of Flink 1.2.0 
that
+ * makes sure no default Java serialization is used.
+ *
+ * The abstract class allows to overwrite the serialization behaviour 
for
+ * {@link FileStateHandle} instances. This is the only practical difference
+ * between Flink 1.2.x and versions >= Flink 1.3.0.
+ *
+ * This will probably be extended in ways that I cannot imagine at this 
point
+ * in time. If for whatever reason the abstract base class turns out to be 
a
+ * bad idea, feel free to change stuff. Right now, it's sole purpose is 
reducing
+ * code duplication between {@link SavepointV1Serializer} and {@link 
SavepointV2Serializer}.
+ */
+abstract class AbstractSavepointSerializer implements 
SavepointSerializer {
--- End diff --

All of this is shared between V1 and V2 savepoints. I've moved it here and 
allowed to overwrite the file state handle serialization behaviour. 
@StefanRRichter, @StephanEwen The usefulness of this depends on how many 
changes we expect in the future. If we expect many, I think it is better to 
actually duplicate the code instead of moving it to an abstract class. For now, 
I'm slightly leaning towards not expecting many changes and hence moved it to 
the abstract class. In any case, moving back with this is a very trivial 
refactoring if you want the code to be rather copied.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and 

[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890215#comment-15890215
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3442#discussion_r103688876
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/AbstractSavepoint.java
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import java.util.Collection;
+import java.util.Objects;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A base savepoint class implementing the simple accessors of the 
Savepoint
+ * interface.
+ */
+abstract class AbstractSavepoint implements Savepoint {
--- End diff --

This code was the same for both `SavepointV1` and `SavepointV2`. Therefore 
I've moved it here.


> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890214#comment-15890214
 ] 

ASF GitHub Bot commented on FLINK-5778:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3442

[FLINK-5778] [savepoints] Add savepoint serializer with relative file path 
serialization

This adds a new savepoint version, `SavepointV2`. The corresponding 
`SavepointV2Serializer` is the same as our current `SavepointV1Serializer` 
except that `FileStateHandle` instances are serialized with their file path 
relative to the savepoint base path.

As an example imagine a savepoint in directory 
`hdfs:///path/to/savepoint-directory` with this data file:

```
hdfs:///path/to/savepoint-directory/_metadata
hdfs:///path/to/savepoint-directory/data-X
hdfs:///path/to/savepoint-directory/data-Y
```

Previously, the complete file path was stored. With this PR, we only store 
`data-X` for file state handles and reconstruct the complete path from the 
savepoint directory on restore. This enables us to move the savepoint directory 
around. The only requirement is that the layout within the savepoint directory 
does not change. I think this is a reasonable restriction.

In addition to the added tests, I've tested this manually by triggering 
savepoints, moving the savepoint around in the local file system as well as to 
HDFS and restoring from it. 

The code between `SavepointV1` and `SavepointV2` and the respective 
serializers is mostly shared. Therefore, I've moved the base logic out to an 
abstract `AbstractSavepoint` and `AbstractSavepointSerializer`.

The migration story is that you can resume old savepoints as before and all 
newly triggered savepoints will be V2 savepoints that serialize file state 
handles with their relative path. You can also resume with `1.3-SNAPSHOT` 
savepoint without any issues.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 5778-relocatable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3442


commit 1bc3b3bff1b33eb204e8b9d4cd9589105dd60466
Author: Ufuk Celebi 
Date:   2017-02-28T21:36:24Z

[FLINK-5778] [savepoints] Add savepoint serializer with relative file path 
serializaton




> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-02-26 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15884816#comment-15884816
 ] 

Ufuk Celebi commented on FLINK-5778:


I think your suggestion makes a lot of sense. I will do it as you propose and 
handle this at the savepoint serializer.

> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-02-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866466#comment-15866466
 ] 

Stephan Ewen commented on FLINK-5778:
-

I am not sure about this design. State handle immutability is something I would 
really like to preserve.

How about making all FileStateHandles consist of a base path and a file path?
  - When serializing, the SavepointSerializer stores only the file path
  - When deserializing, the new state handles are constructed from the root 
path of the checkpoint, and the name in the savepoint



> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)