[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5396 ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165344021 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java --- @@ -19,58 +19,53 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +/** + * Unit tests for the {@link CompletedCheckpoint}. + */ public class CompletedCheckpointTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); - /** -* Tests that persistent checkpoints discard their header file. -*/ @Test - public void testDiscard() throws Exception { - File file = tmpFolder.newFile(); - assertEquals(true, file.exists()); - + public void registerStatesAtRegistry() { --- End diff -- The test whether state handles are correctly registered at the SharedStateRegistry was originally just sneakily added to a pre-existing metadata file cleanup test. That did not seem right ;-) This factors the test out into a separate method. The test method should be called `testRegisterStatesAtRegistry` instead of `registerStatesAtRegistry`. Will change that... ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165343453 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java --- @@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws Exception { assertTrue(isDirectoryEmpty(directory)); } + // + // Not deleting parent directories + // + + /** +* This test checks that the stream does not check and clean the parent directory +* when encountering a write error. +*/ + @Test + public void testStreamDoesNotTryToCleanUpParentOnError() throws Exception { + final File directory = tempDir.newFolder(); + + // prevent creation of files in that directory + assertTrue(directory.setWritable(false, true)); + checkDirectoryNotWritable(directory); + + FileSystem fs = spy(FileSystem.getLocalFileSystem()); + + FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + stream1.write(new byte[61]); + stream2.write(new byte[61]); + + try { + stream1.closeAndGetHandle(); + fail("this should fail with an exception"); + } catch (IOException ignored) {} + + stream2.close(); + + verify(fs, times(0)).delete(any(Path.class), anyBoolean()); --- End diff -- Will add an additional check that the directory still exists ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165295667 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -18,45 +18,50 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.JobStatus; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * The configuration of a checkpoint, such as whether + * The configuration of a checkpoint. This described whether --- End diff -- nit: type ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165317663 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java --- @@ -19,58 +19,53 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +/** + * Unit tests for the {@link CompletedCheckpoint}. + */ public class CompletedCheckpointTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); - /** -* Tests that persistent checkpoints discard their header file. -*/ @Test - public void testDiscard() throws Exception { - File file = tmpFolder.newFile(); - assertEquals(true, file.exists()); - + public void registerStatesAtRegistry() { --- End diff -- What's the reason for this change? ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165318529 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java --- @@ -332,6 +335,43 @@ public void testMixedBelowAndAboveThreshold() throws Exception { assertTrue(isDirectoryEmpty(directory)); } + // + // Not deleting parent directories + // + + /** +* This test checks that the stream does not check and clean the parent directory +* when encountering a write error. +*/ + @Test + public void testStreamDoesNotTryToCleanUpParentOnError() throws Exception { + final File directory = tempDir.newFolder(); + + // prevent creation of files in that directory + assertTrue(directory.setWritable(false, true)); + checkDirectoryNotWritable(directory); + + FileSystem fs = spy(FileSystem.getLocalFileSystem()); + + FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream( + Path.fromLocalFile(directory), fs, 1024, 1); + + stream1.write(new byte[61]); + stream2.write(new byte[61]); + + try { + stream1.closeAndGetHandle(); + fail("this should fail with an exception"); + } catch (IOException ignored) {} + + stream2.close(); + + verify(fs, times(0)).delete(any(Path.class), anyBoolean()); --- End diff -- nit: This seems somewhat brittle because there could be another "delete" method that the handle uses to delete the parent dir. For "future proof-ness"... ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5396#discussion_r165295768 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java --- @@ -18,45 +18,50 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.JobStatus; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * The configuration of a checkpoint, such as whether + * The configuration of a checkpoint. This described whether * - * The checkpoint should be persisted - * The checkpoint must be full, or may be incremental (not yet implemented) - * The checkpoint format must be the common (cross backend) format, - * or may be state-backend specific (not yet implemented) - * when the checkpoint should be garbage collected + * The checkpoint is s regular checkpoint or a savepoint + * When the checkpoint should be garbage collected * */ public class CheckpointProperties implements Serializable { - private static final long serialVersionUID = -8835900655844879470L; + private static final long serialVersionUID = 2L; - private final boolean forced; + /** Type - checkpoit / savepoint. */ --- End diff -- nit: typo ---
[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5396 [FLINK-5820] [state backends] Split shared/exclusive state and properly handle disposal ## What is the purpose of the change This PR contains the final changes needed for [FLINK-5820]. Disposal of checkpoint directories happens properly across all file system types (previously did not work properly for some S3 connectors) with reduced calls to the file systems. Shared and exclusive state are split into different directories, to help implement cleanup safety nets. ## Brief change log 1. TaskManagers use the `CheckpointStorage` to create `CheckpointStreamFactories`. Previously, these stream factories were created by the `StateBackend`. This completes the separating out the "storage" aspect of the `StateBackend` into the `CheckpointStorage`. 2. The location where to store state is communicated between the `CheckpointCoordinator` (instantiating the original `CheckpointStorageLocation` for a checkpoint/savepoint) and the Tasks in a unified manner. Tasks transparently obtain their `CheckpointStreamFactories` always in the same way, regardless of whether writing state for checkpoints or savepoints. 3. Checkpoint state now has the scope `EXCLUSIVE` or `SHARED`, which may be stored differently. The current file system based backends put shared state into a */shared* directory, while exclusive state goes into the */chk-1234* directory. 4. Tasks can directly write *task-owned state* to a checkpoint storage. That state neither belongs specifically to one checkpoint, nor is it shared and eventually released by the Checkpoint Coordinator. Only the tasks themselves may release the state. An example for that type of state are the *write ahead logs* created by some sinks. 5. When a checkpoint is finalized, its storage is described by a `CompletedCheckpointStorageLocation`. That object gives access to addressing, metadata, and handles location disposal. This allows us to drop the *"delete parent if empty"* logic in File State Handles and fixes the issue that checkpoint directories are currently left over on S3. **Future Work** - In the future, the `CompletedCheckpointStorageLocation` should also be used as a way to handle relative addressing of checkpoints, to allow users to move them to different directories without breaking the internal paths. - We can now implement disposal fast paths, like drop directory as a whole, rather than dropping each state object separately. However, one would still need to release drop shared state objects individually. Finishing these fast paths is currently blocked on some rework of the shared state handles, to make their selective release easier and more robust. ## Verifying this change This change can be verified by running a Flink cluster with a checkpointed program and This PR also adds and adjusts various unit tests to guard the new behavior. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? *Somewhat* (it changes the state backend directory layouts) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink locations Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5396.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5396 commit ec8e552a7b50b8c605bb2609713cb2dd50245118 Author: Stephan EwenDate: 2018-01-30T14:53:46Z [hotfix] [checkpointing] Cleanup: Fix Nullability and names in checkpoint stats. commit cf18831b69bd909d6491eb73d3294d3295ddd930 Author: Stephan Ewen Date: 2018-01-30T10:57:30Z [hotfix] [tests] Drop obsolete CheckpointExternalResumeTest. Because all checkpoints are now externalized (write their metadata) this is an obsolete test. commit a46acdd0f7142e40eee8c742e17eefaa6c7da3da Author: Stephan Ewen Date: 2018-01-29T22:24:24Z [hotfix] [checkpoints] Clean up