[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5396


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165344021
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 ---
@@ -19,58 +19,53 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Unit tests for the {@link CompletedCheckpoint}.
+ */
 public class CompletedCheckpointTest {
 
@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-   /**
-* Tests that persistent checkpoints discard their header file.
-*/
@Test
-   public void testDiscard() throws Exception {
-   File file = tmpFolder.newFile();
-   assertEquals(true, file.exists());
-
+   public void registerStatesAtRegistry() {
--- End diff --

The test whether state handles are correctly registered at the 
SharedStateRegistry was originally just sneakily added to a pre-existing 
metadata file cleanup test. That did not seem right ;-)

This factors the test out into a separate method. The test method should be 
called `testRegisterStatesAtRegistry` instead of `registerStatesAtRegistry`. 
Will change that...


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165343453
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 ---
@@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws 
Exception {
assertTrue(isDirectoryEmpty(directory));
}
 
+   // 

+   //  Not deleting parent directories
+   // 

+
+   /**
+* This test checks that the stream does not check and clean the parent 
directory
+* when encountering a write error.
+*/
+   @Test
+   public void testStreamDoesNotTryToCleanUpParentOnError() throws 
Exception {
+   final File directory = tempDir.newFolder();
+
+   // prevent creation of files in that directory
+   assertTrue(directory.setWritable(false, true));
+   checkDirectoryNotWritable(directory);
+
+   FileSystem fs = spy(FileSystem.getLocalFileSystem());
+
+   FsCheckpointStateOutputStream stream1 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   FsCheckpointStateOutputStream stream2 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   stream1.write(new byte[61]);
+   stream2.write(new byte[61]);
+
+   try {
+   stream1.closeAndGetHandle();
+   fail("this should fail with an exception");
+   } catch (IOException ignored) {}
+
+   stream2.close();
+
+   verify(fs, times(0)).delete(any(Path.class), anyBoolean());
--- End diff --

Will add an additional check that the directory still exists


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165295667
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,45 +18,50 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * The configuration of a checkpoint, such as whether
+ * The configuration of a checkpoint. This described whether
--- End diff --

nit: type


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165317663
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 ---
@@ -19,58 +19,53 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Unit tests for the {@link CompletedCheckpoint}.
+ */
 public class CompletedCheckpointTest {
 
@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-   /**
-* Tests that persistent checkpoints discard their header file.
-*/
@Test
-   public void testDiscard() throws Exception {
-   File file = tmpFolder.newFile();
-   assertEquals(true, file.exists());
-
+   public void registerStatesAtRegistry() {
--- End diff --

What's the reason for this change?


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165318529
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 ---
@@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws 
Exception {
assertTrue(isDirectoryEmpty(directory));
}
 
+   // 

+   //  Not deleting parent directories
+   // 

+
+   /**
+* This test checks that the stream does not check and clean the parent 
directory
+* when encountering a write error.
+*/
+   @Test
+   public void testStreamDoesNotTryToCleanUpParentOnError() throws 
Exception {
+   final File directory = tempDir.newFolder();
+
+   // prevent creation of files in that directory
+   assertTrue(directory.setWritable(false, true));
+   checkDirectoryNotWritable(directory);
+
+   FileSystem fs = spy(FileSystem.getLocalFileSystem());
+
+   FsCheckpointStateOutputStream stream1 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   FsCheckpointStateOutputStream stream2 = new 
FsCheckpointStateOutputStream(
+   Path.fromLocalFile(directory), fs, 1024, 1);
+
+   stream1.write(new byte[61]);
+   stream2.write(new byte[61]);
+
+   try {
+   stream1.closeAndGetHandle();
+   fail("this should fail with an exception");
+   } catch (IOException ignored) {}
+
+   stream2.close();
+
+   verify(fs, times(0)).delete(any(Path.class), anyBoolean());
--- End diff --

nit: This seems somewhat brittle because there could be another "delete" 
method that the handle uses to delete the parent dir. For "future proof-ness"...


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5396#discussion_r165295768
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 ---
@@ -18,45 +18,50 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * The configuration of a checkpoint, such as whether
+ * The configuration of a checkpoint. This described whether
  * 
- * The checkpoint should be persisted
- * The checkpoint must be full, or may be incremental (not yet 
implemented)
- * The checkpoint format must be the common (cross backend) format,
- * or may be state-backend specific (not yet implemented)
- * when the checkpoint should be garbage collected
+ * The checkpoint is s regular checkpoint or a savepoint
+ * When the checkpoint should be garbage collected
  * 
  */
 public class CheckpointProperties implements Serializable {
 
-   private static final long serialVersionUID = -8835900655844879470L;
+   private static final long serialVersionUID = 2L;
 
-   private final boolean forced;
+   /** Type - checkpoit / savepoint. */
--- End diff --

nit: typo


---


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-01-31 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5396

[FLINK-5820] [state backends] Split shared/exclusive state and properly 
handle disposal

## What is the purpose of the change

This PR contains the final changes needed for [FLINK-5820]. Disposal of 
checkpoint directories happens properly across all file system types 
(previously did not work properly for some S3 connectors) with reduced calls to 
the file systems. Shared and exclusive state are split into different 
directories, to help implement cleanup safety nets.

## Brief change log

  1. TaskManagers use the `CheckpointStorage` to create 
`CheckpointStreamFactories`. Previously, these stream factories were created by 
the `StateBackend`. This completes the separating out the "storage" aspect of 
the `StateBackend` into the `CheckpointStorage`.

  2. The location where to store state is communicated between the 
`CheckpointCoordinator` (instantiating the original `CheckpointStorageLocation` 
for a checkpoint/savepoint) and the Tasks in a unified manner. Tasks 
transparently obtain their `CheckpointStreamFactories` always in the same way, 
regardless of whether writing state for checkpoints or savepoints.

  3. Checkpoint state now has the scope `EXCLUSIVE` or `SHARED`, which may 
be stored differently. The current file system based backends put shared state 
into a */shared* directory, while exclusive state goes into the */chk-1234* 
directory.

  4. Tasks can directly write *task-owned state* to a checkpoint storage. 
That state neither belongs specifically to one checkpoint, nor is it shared and 
eventually released by the Checkpoint Coordinator. Only the tasks themselves 
may release the state. An example for that type of state are the *write ahead 
logs* created by some sinks.

  5. When a checkpoint is finalized, its storage is described by a 
`CompletedCheckpointStorageLocation`. That object gives access to addressing, 
metadata, and handles location disposal. This allows us to drop the *"delete 
parent if empty"* logic in File State Handles and fixes the issue that 
checkpoint directories are currently left over on S3.

**Future Work**

  - In the future, the `CompletedCheckpointStorageLocation` should also be 
used as a way to handle relative addressing of checkpoints, to allow users to 
move them to different directories without breaking the internal paths.

  - We can now implement disposal fast paths, like drop directory as a 
whole, rather than dropping each state object separately. However, one would 
still need to release drop shared state objects individually. Finishing these 
fast paths is currently blocked on some rework of the shared state handles, to 
make their selective release easier and more robust.

## Verifying this change

This change can be verified by running a Flink cluster with a checkpointed 
program and 

This PR also adds and adjusts various unit tests to guard the new behavior.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? *Somewhat* (it changes 
the state backend directory layouts)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink locations

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5396.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5396


commit ec8e552a7b50b8c605bb2609713cb2dd50245118
Author: Stephan Ewen 
Date:   2018-01-30T14:53:46Z

[hotfix] [checkpointing] Cleanup: Fix Nullability and names in checkpoint 
stats.

commit cf18831b69bd909d6491eb73d3294d3295ddd930
Author: Stephan Ewen 
Date:   2018-01-30T10:57:30Z

[hotfix] [tests] Drop obsolete CheckpointExternalResumeTest.

Because all checkpoints are now externalized (write their metadata) this is 
an obsolete test.

commit a46acdd0f7142e40eee8c742e17eefaa6c7da3da
Author: Stephan Ewen 
Date:   2018-01-29T22:24:24Z

[hotfix] [checkpoints] Clean up