Repository: flink Updated Branches: refs/heads/master dcea46e89 -> edae79340
[FLINK-3358] [FLINK-3351] [rocksdb] Add simple constructors and automatic temp path configuration This adds constructors that only take a backup dir URI and use it to initialize both the RocksDB file backups and the FileSystem state backend for non-partitioned state. Also, the RocksDBStateBackend now automatically picks up the TaskManager's temp directories, if no local storage directories are explicitly configured. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edae7934 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edae7934 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edae7934 Branch: refs/heads/master Commit: edae79340dd486915d25109cbdc1485accae665a Parents: be72758 Author: Stephan Ewen <se...@apache.org> Authored: Thu Feb 11 21:30:36 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 11 21:34:03 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBStateBackend.java | 241 ++++++++++++++-- .../state/RocksDBStateBackendConfigTest.java | 280 +++++++++++++++++++ .../state/RocksDBStateBackendTest.java | 4 +- .../state/filesystem/FsStateBackend.java | 99 ++++--- .../EventTimeWindowCheckpointingITCase.java | 13 +- 5 files changed, 566 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index eddd8c0..5b16e86 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -18,7 +18,12 @@ package org.apache.flink.contrib.streaming.state; import java.io.File; +import java.io.IOException; import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; @@ -28,13 +33,17 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.api.common.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.rocksdb.Options; import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; @@ -54,20 +63,35 @@ import static java.util.Objects.requireNonNull; public class RocksDBStateBackend extends AbstractStateBackend { private static final long serialVersionUID = 1L; - /** Base path for RocksDB directory. */ - private final String dbBasePath; - - /** The checkpoint directory that we snapshot RocksDB backups to. */ - private final String checkpointDirectory; + private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); + + + /** The checkpoint directory that copy the RocksDB backups to. */ + private final Path checkpointDirectory; + /** The state backend that stores the non-partitioned state */ + private final AbstractStateBackend nonPartitionedStateBackend; + + /** Operator identifier that is used to uniqueify the RocksDB storage path. */ private String operatorIdentifier; /** JobID for uniquifying backup paths. */ private JobID jobId; + - private AbstractStateBackend backingStateBackend; + // DB storage directories + + /** Base paths for RocksDB directory, as configured. May be null. */ + private Path[] dbBasePaths; + /** Base paths for RocksDB directory, as initialized */ + private File[] dbStorageDirectories; + + private int nextDirectory; + + // RocksDB options + /** The pre-configured option settings */ private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT; @@ -79,31 +103,112 @@ public class RocksDBStateBackend extends AbstractStateBackend { // ------------------------------------------------------------------------ - public RocksDBStateBackend(String dbBasePath, String checkpointDirectory, AbstractStateBackend backingStateBackend) { - this.dbBasePath = requireNonNull(dbBasePath); - this.checkpointDirectory = requireNonNull(checkpointDirectory); - this.backingStateBackend = requireNonNull(backingStateBackend); + /** + * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the + * file system and location defined by the given URI. + * + * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system + * host and port in the URI, or have the Hadoop configuration that describes the file system + * (host / high-availability group / possibly credentials) either referenced from the Flink + * config, or included in the classpath. + * + * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public RocksDBStateBackend(String checkpointDataUri) throws IOException { + this(new Path(checkpointDataUri).toUri()); } + /** + * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the + * file system and location defined by the given URI. + * + * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system + * host and port in the URI, or have the Hadoop configuration that describes the file system + * (host / high-availability group / possibly credentials) either referenced from the Flink + * config, or included in the classpath. + * + * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public RocksDBStateBackend(URI checkpointDataUri) throws IOException { + // creating the FsStateBackend automatically sanity checks the URI + FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri); + + this.nonPartitionedStateBackend = fsStateBackend; + this.checkpointDirectory = fsStateBackend.getBasePath(); + } + + + public RocksDBStateBackend( + String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException { + + this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend); + } + + public RocksDBStateBackend( + URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException { + + this.nonPartitionedStateBackend = requireNonNull(nonPartitionedStateBackend); + this.checkpointDirectory = FsStateBackend.validateAndNormalizeUri(checkpointDataUri); + } + + // ------------------------------------------------------------------------ + // State backend methods // ------------------------------------------------------------------------ @Override public void initializeForJob( Environment env, String operatorIdentifier, - TypeSerializer<?> keySerializer) throws Exception - { + TypeSerializer<?> keySerializer) throws Exception { + super.initializeForJob(env, operatorIdentifier, keySerializer); + + this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer); + this.operatorIdentifier = operatorIdentifier.replace(" ", ""); - backingStateBackend.initializeForJob(env, operatorIdentifier, keySerializer); this.jobId = env.getJobID(); + + // initialize the paths where the local RocksDB files should be stored + if (dbBasePaths == null) { + // initialize from the temp directories + dbStorageDirectories = env.getIOManager().getSpillingDirectories(); + } + else { + List<File> dirs = new ArrayList<>(dbBasePaths.length); + String errorMessage = ""; + + for (Path path : dbBasePaths) { + File f = new File(path.toUri().getPath()); + if (!f.exists() && !f.mkdirs()) { + String msg = "Local DB files directory '" + f.getAbsolutePath() + + "' does not exist and cannot be created. "; + LOG.error(msg); + errorMessage += msg; + } + dirs.add(f); + } + + if (dirs.isEmpty()) { + throw new Exception("No local storage directories available. " + errorMessage); + } else { + dbStorageDirectories = dirs.toArray(new File[dirs.size()]); + } + } + + nextDirectory = new Random().nextInt(dbStorageDirectories.length); } @Override - public void disposeAllStateForCurrentJob() throws Exception {} + public void disposeAllStateForCurrentJob() throws Exception { + nonPartitionedStateBackend.disposeAllStateForCurrentJob(); + } @Override public void close() throws Exception { + nonPartitionedStateBackend.close(); + Options opt = this.rocksDbOptions; if (opt != null) { opt.dispose(); @@ -111,13 +216,25 @@ public class RocksDBStateBackend extends AbstractStateBackend { } } - private File getDbPath(String stateName) { - return new File(new File(new File(new File(dbBasePath), jobId.toString()), operatorIdentifier), stateName); + File getDbPath(String stateName) { + return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName); } - private String getCheckpointPath(String stateName) { + String getCheckpointPath(String stateName) { return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName; } + + File[] getStoragePaths() { + return dbStorageDirectories; + } + + File getNextStoragePath() { + int ni = nextDirectory + 1; + ni = ni >= dbStorageDirectories.length ? 0 : ni; + nextDirectory = ni; + + return dbStorageDirectories[ni]; + } // ------------------------------------------------------------------------ // State factories @@ -154,20 +271,94 @@ public class RocksDBStateBackend extends AbstractStateBackend { } @Override - public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, - long timestamp) throws Exception { - return backingStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp); + public CheckpointStateOutputStream createCheckpointStateOutputStream( + long checkpointID, long timestamp) throws Exception { + + return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp); } @Override - public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state, - long checkpointID, - long timestamp) throws Exception { - return backingStateBackend.checkpointStateSerializable(state, checkpointID, timestamp); + public <S extends Serializable> StateHandle<S> checkpointStateSerializable( + S state, long checkpointID, long timestamp) throws Exception { + + return nonPartitionedStateBackend.checkpointStateSerializable(state, checkpointID, timestamp); + } + + // ------------------------------------------------------------------------ + // Parameters + // ------------------------------------------------------------------------ + + /** + * Sets the path where the RocksDB local database files should be stored on the local + * file system. Setting this path overrides the default behavior, where the + * files are stored across the configured temp directories. + * + * <p>Passing {@code null} to this function restores the default behavior, where the configured + * temp directories will be used. + * + * @param path The path where the local RocksDB database files are stored. + */ + public void setDbStoragePath(String path) { + setDbStoragePaths(path == null ? null : new String[] { path }); + } + + /** + * Sets the paths across which the local RocksDB database files are distributed on the local + * file system. Setting these paths overrides the default behavior, where the + * files are stored across the configured temp directories. + * + * <p>Each distinct state will be stored in one path, but when the state backend creates + * multiple states, they will store their files on different paths. + * + * <p>Passing {@code null} to this function restores the default behavior, where the configured + * temp directories will be used. + * + * @param paths The paths across which the local RocksDB database files will be spread. + */ + public void setDbStoragePaths(String... paths) { + if (paths == null) { + dbBasePaths = null; + } + else if (paths.length == 0) { + throw new IllegalArgumentException("empty paths"); + } + else { + Path[] pp = new Path[paths.length]; + + for (int i = 0; i < paths.length; i++) { + if (paths[i] == null) { + throw new IllegalArgumentException("null path"); + } + + pp[i] = new Path(paths[i]); + String scheme = pp[i].toUri().getScheme(); + if (scheme != null && !scheme.equalsIgnoreCase("file")) { + throw new IllegalArgumentException("Path " + paths[i] + " has a non local scheme"); + } + } + + dbBasePaths = pp; + } + } + + /** + * + * @return The configured DB storage paths, or null, if none were configured. + */ + public String[] getDbStoragePaths() { + if (dbBasePaths == null) { + return null; + } else { + String[] paths = new String[dbBasePaths.length]; + for (int i = 0; i < paths.length; i++) { + paths[i] = dbBasePaths[i].toString(); + } + return paths; + } } // ------------------------------------------------------------------------ - // Parametrize with Options + // Parametrize with RocksDB Options // ------------------------------------------------------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java new file mode 100644 index 0000000..e62d39c --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.junit.Test; + +import org.rocksdb.CompactionStyle; +import org.rocksdb.Options; + +import java.io.File; +import java.util.UUID; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * Tests for configuring the RocksDB State Backend + */ +@SuppressWarnings("serial") +public class RocksDBStateBackendConfigTest { + + private static final String TEMP_URI = new File(System.getProperty("java.io.tmpdir")).toURI().toString(); + + // ------------------------------------------------------------------------ + // RocksDB local file directory + // ------------------------------------------------------------------------ + + @Test + public void testSetDbPath() throws Exception { + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + + assertNull(rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePath("/abc/def"); + assertArrayEquals(new String[] { "/abc/def" }, rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePath(null); + assertNull(rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePaths("/abc/def", "/uvw/xyz"); + assertArrayEquals(new String[] { "/abc/def", "/uvw/xyz" }, rocksDbBackend.getDbStoragePaths()); + + //noinspection NullArgumentToVariableArgMethod + rocksDbBackend.setDbStoragePaths(null); + assertNull(rocksDbBackend.getDbStoragePaths()); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetNullPaths() throws Exception { + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + rocksDbBackend.setDbStoragePaths(); + } + + @Test(expected = IllegalArgumentException.class) + public void testNonFileSchemePath() throws Exception { + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition"); + } + + // ------------------------------------------------------------------------ + // RocksDB local file automatic from temp directories + // ------------------------------------------------------------------------ + + @Test + public void testUseTempDirectories() throws Exception { + File dir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); + File dir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); + + File[] tempDirs = new File[] { dir1, dir2 }; + + try { + assertTrue(dir1.mkdirs()); + assertTrue(dir2.mkdirs()); + + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + assertNull(rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.initializeForJob(getMockEnvironment(tempDirs), "foobar", IntSerializer.INSTANCE); + assertArrayEquals(tempDirs, rocksDbBackend.getStoragePaths()); + } + finally { + FileUtils.deleteDirectory(dir1); + FileUtils.deleteDirectory(dir2); + } + } + + // ------------------------------------------------------------------------ + // RocksDB local file directory initialization + // ------------------------------------------------------------------------ + + @Test + public void testFailWhenNoLocalStorageDir() throws Exception { + File targetDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); + try { + assertTrue(targetDir.mkdirs()); + + if (!targetDir.setWritable(false, false)) { + System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable"); + return; + } + + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); + + try { + rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE); + } + catch (Exception e) { + assertTrue(e.getMessage().contains("No local storage directories available")); + assertTrue(e.getMessage().contains(targetDir.getAbsolutePath())); + } + } + finally { + //noinspection ResultOfMethodCallIgnored + targetDir.setWritable(true, false); + FileUtils.deleteDirectory(targetDir); + } + } + + @Test + public void testContinueOnSomeDbDirectoriesMissing() throws Exception { + File targetDir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); + File targetDir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); + + try { + assertTrue(targetDir1.mkdirs()); + assertTrue(targetDir2.mkdirs()); + + if (!targetDir1.setWritable(false, false)) { + System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory non-writable"); + return; + } + + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath()); + + try { + rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE); + } + catch (Exception e) { + e.printStackTrace(); + fail("Backend initialization failed even though some paths were available"); + } + } finally { + //noinspection ResultOfMethodCallIgnored + targetDir1.setWritable(true, false); + FileUtils.deleteDirectory(targetDir1); + FileUtils.deleteDirectory(targetDir2); + } + } + + // ------------------------------------------------------------------------ + // RocksDB Options + // ------------------------------------------------------------------------ + + @Test + public void testPredefinedOptions() throws Exception { + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + + assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); + + rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); + assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); + + Options opt1 = rocksDbBackend.getRocksDBOptions(); + Options opt2 = rocksDbBackend.getRocksDBOptions(); + + assertEquals(opt1, opt2); + + assertEquals(CompactionStyle.LEVEL, opt1.compactionStyle()); + } + + @Test + public void testOptionsFactory() throws Exception { + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + + rocksDbBackend.setOptions(new OptionsFactory() { + @Override + public Options createOptions(Options currentOptions) { + return currentOptions.setCompactionStyle(CompactionStyle.FIFO); + } + }); + + assertNotNull(rocksDbBackend.getOptions()); + assertEquals(CompactionStyle.FIFO, rocksDbBackend.getRocksDBOptions().compactionStyle()); + } + + @Test + public void testPredefinedAndOptionsFactory() throws Exception { + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); + + assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); + + rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); + rocksDbBackend.setOptions(new OptionsFactory() { + @Override + public Options createOptions(Options currentOptions) { + return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL); + } + }); + + assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); + assertNotNull(rocksDbBackend.getOptions()); + assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getRocksDBOptions().compactionStyle()); + } + + @Test + public void testPredefinedOptionsEnum() { + for (PredefinedOptions o : PredefinedOptions.values()) { + Options opt = o.createOptions(); + try { + assertNotNull(opt); + } finally { + opt.dispose(); + } + } + } + + // ------------------------------------------------------------------------ + // Contained Non-partitioned State Backend + // ------------------------------------------------------------------------ + + @Test + public void testCallsForwardedToNonPartitionedBackend() throws Exception { + AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI, nonPartBackend); + + rocksDbBackend.initializeForJob(getMockEnvironment(), "foo", IntSerializer.INSTANCE); + verify(nonPartBackend, times(1)).initializeForJob(any(Environment.class), anyString(), any(TypeSerializer.class)); + + rocksDbBackend.disposeAllStateForCurrentJob(); + verify(nonPartBackend, times(1)).disposeAllStateForCurrentJob(); + + rocksDbBackend.close(); + verify(nonPartBackend, times(1)).close(); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private static Environment getMockEnvironment() { + return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); + } + + private static Environment getMockEnvironment(File[] tempDirs) { + IOManager ioMan = mock(IOManager.class); + when(ioMan.getSpillingDirectories()).thenReturn(tempDirs); + + Environment env = mock(Environment.class); + when(env.getJobID()).thenReturn(new JobID()); + when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader()); + when(env.getIOManager()).thenReturn(ioMan); + return env; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 3b3ac31..fe933e0 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -40,7 +40,9 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa dbDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); chkDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - return new RocksDBStateBackend(dbDir.getAbsolutePath(), "file://" + chkDir.getAbsolutePath(), new MemoryStateBackend()); + RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend()); + backend.setDbStoragePath(dbDir.getAbsolutePath()); + return backend; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 411b536..37c1392 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -97,7 +97,7 @@ public class FsStateBackend extends AbstractStateBackend { * classpath. * * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to teh checkpoint data directory. + * and the path to the checkpoint data directory. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public FsStateBackend(String checkpointDataUri) throws IOException { @@ -116,7 +116,7 @@ public class FsStateBackend extends AbstractStateBackend { * classpath. * * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), - * and the path to teh checkpoint data directory. + * and the path to the checkpoint data directory. * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public FsStateBackend(Path checkpointDataUri) throws IOException { @@ -161,21 +161,6 @@ public class FsStateBackend extends AbstractStateBackend { * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws IOException { - final String scheme = checkpointDataUri.getScheme(); - final String path = checkpointDataUri.getPath(); - - // some validity checks - if (scheme == null) { - throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + - "Please specify the file system scheme explicitly in the URI."); - } - if (path == null) { - throw new IllegalArgumentException("The path to store the checkpoint data in is null. " + - "Please specify a directory path for the checkpoint data."); - } - if (path.length() == 0 || path.equals("/")) { - throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); - } if (fileStateSizeThreshold < 0) { throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); } @@ -183,30 +168,10 @@ public class FsStateBackend extends AbstractStateBackend { throw new IllegalArgumentException("The threshold for file state size cannot be larger than " + MAX_FILE_STATE_THRESHOLD); } - - // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same - // (distributed) filesystem on all hosts and includes full host/port information, even if the - // original URI did not include that. We count on the filesystem loading from the configuration - // to fill in the missing data. - - // try to grab the file system for this path/URI - this.filesystem = FileSystem.get(checkpointDataUri); - if (this.filesystem == null) { - throw new IOException("Could not find a file system for the given scheme in the available configurations."); - } - - URI fsURI = this.filesystem.getUri(); - try { - URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null); - this.basePath = new Path(baseURI); - } - catch (URISyntaxException e) { - throw new IOException( - String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", - checkpointDataUri, fsURI), e); - } - this.fileStateThreshold = fileStateSizeThreshold; + + this.basePath = validateAndNormalizeUri(checkpointDataUri); + this.filesystem = this.basePath.getFileSystem(); } /** @@ -371,6 +336,60 @@ public class FsStateBackend extends AbstractStateBackend { "File State Backend (initialized) @ " + checkpointDirectory; } + /** + * Checks and normalizes the checkpoint data URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URI + * to a path. + * + * <p>If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param checkpointDataUri The URI to check and normalize. + * @return A normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses scheme or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException { + final String scheme = checkpointDataUri.getScheme(); + final String path = checkpointDataUri.getPath(); + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI."); + } + if (path == null) { + throw new IllegalArgumentException("The path to store the checkpoint data in is null. " + + "Please specify a directory path for the checkpoint data."); + } + if (path.length() == 0 || path.equals("/")) { + throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); + } + + // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same + // (distributed) filesystem on all hosts and includes full host/port information, even if the + // original URI did not include that. We count on the filesystem loading from the configuration + // to fill in the missing data. + + // try to grab the file system for this path/URI + FileSystem filesystem = FileSystem.get(checkpointDataUri); + if (filesystem == null) { + throw new IOException("Could not find a file system for the given scheme in the available configurations."); + } + + URI fsURI = filesystem.getUri(); + try { + URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null); + return new Path(baseURI); + } + catch (URISyntaxException e) { + throw new IOException( + String.format("Cannot create file system URI for checkpointDataUri %s and filesystem URI %s", + checkpointDataUri, fsURI), e); + } + } + // ------------------------------------------------------------------------ // Output stream for state checkpointing // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 2039528..9400bd7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -111,16 +111,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { case MEM: this.stateBackend = new MemoryStateBackend(); break; - case FILE: + case FILE: { String backups = tempFolder.newFolder().getAbsolutePath(); this.stateBackend = new FsStateBackend("file://" + backups); break; - case ROCKSDB: + } + case ROCKSDB: { String rocksDb = tempFolder.newFolder().getAbsolutePath(); - String rocksDbBackups = tempFolder.newFolder().getAbsolutePath(); - - this.stateBackend = new RocksDBStateBackend(rocksDb, "file://" + rocksDbBackups, new MemoryStateBackend()); + String rocksDbBackups = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()); + rdb.setDbStoragePath(rocksDb); + this.stateBackend = rdb; break; + } } }