[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14964747#comment-14964747 ] ASF GitHub Bot commented on FLINK-2805: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1227 > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14950933#comment-14950933 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1227#issuecomment-146954580 Looks good. I will rebase it on the latest state recovery branch and if travis gives green light, then I'll merge it. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945105#comment-14945105 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41270908 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -70,18 +82,65 @@ private final int maxConnections; /** +* Shutdown hook thread to ensure deletion of the storage directory (or null if +* {@link RecoveryMode#STANDALONE}) +*/ + private final Thread shutdownHook; + + /** * Instantiates a new BLOB server and binds it to a free network port. * * @throws IOException * thrown if the BLOB server cannot bind to a free network port */ public BlobServer(Configuration config) throws IOException { + checkNotNull(config, "Configuration"); + + this.recoveryMode = RecoveryMode.fromConfig(config); // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); + if (recoveryMode == RecoveryMode.STANDALONE) { + recoveryBasePath = null; + } + else { + // Initialize file state backend for recovery + String stateBackend = config.getString(ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND); + + if (!stateBackend.toLowerCase().equals("filesystem")) { + throw new IllegalConfigurationException(String.format("Illegal state backend " + + "configuration '%s'. Please configure 'FILESYSTEM' as state " + + "backend and specify the recovery path via '%s' key.", + stateBackend, ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } --- End diff -- You could do it similarly to the `RecoveryMode` enum. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944727#comment-14944727 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41238986 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -70,18 +82,65 @@ private final int maxConnections; /** +* Shutdown hook thread to ensure deletion of the storage directory (or null if +* {@link RecoveryMode#STANDALONE}) --- End diff -- According to the code, this field is not null if `RecoverMode.STANDALONE` > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944722#comment-14944722 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41238757 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -70,18 +82,65 @@ private final int maxConnections; /** +* Shutdown hook thread to ensure deletion of the storage directory (or null if +* {@link RecoveryMode#STANDALONE}) +*/ + private final Thread shutdownHook; + + /** * Instantiates a new BLOB server and binds it to a free network port. * * @throws IOException * thrown if the BLOB server cannot bind to a free network port */ public BlobServer(Configuration config) throws IOException { + checkNotNull(config, "Configuration"); + + this.recoveryMode = RecoveryMode.fromConfig(config); // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); + if (recoveryMode == RecoveryMode.STANDALONE) { + recoveryBasePath = null; + } + else { + // Initialize file state backend for recovery + String stateBackend = config.getString(ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND); + + if (!stateBackend.toLowerCase().equals("filesystem")) { + throw new IllegalConfigurationException(String.format("Illegal state backend " + + "configuration '%s'. Please configure 'FILESYSTEM' as state " + + "backend and specify the recovery path via '%s' key.", + stateBackend, ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } --- End diff -- Would be imo better to use a `StateBackend` enum value here instead of string equals. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944729#comment-14944729 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41239095 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -102,8 +161,13 @@ public BlobServer(Configuration config) throws IOException { backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; } - // Add shutdown hook to delete storage directory - this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + if (recoveryMode == RecoveryMode.STANDALONE) { --- End diff -- I assume this should be a `!=`? > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944769#comment-14944769 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41241923 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class BlobRecoveryITCase { + + private File recoveryDir; + + @Before + public void setUp() throws Exception { + recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); + if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { + throw new IllegalStateException("Failed to create temp directory for test"); + } + } + + @After + public void cleanUp() throws Exception { + if (recoveryDir != null) { + FileUtils.deleteDirectory(recoveryDir); + } + } + + /** +* Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any +* participating BlobServer. +*/ + @Test + public void testBlobServerRecovery() throws Exception { + Random rand = new Random(); + + BlobServer[] server = new BlobServer[2]; + InetSocketAddress[] serverAddress = new InetSocketAddress[2]; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); + + for (int i = 0; i < server.length; i++) { + server[i] = new BlobServer(config); + serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); + } + + client = new BlobClient(serverAddress[0]); + + // Random data + byte[] actual = new byte[1024]; + rand.nextBytes(actual); + + BlobKey[] keys = new BlobKey[2]; + + // Put data + keys[0] = client.put(actual); // Request 1 + keys[1] = client.put(actual, 32, 256); // Request 2 + + JobID[] jobId = new JobID[] { new JobID(), new JobID() }; + String[] testKey = new String[] { "test-key-1", "test-key-2" }; + + client.put(jobId[0], testKey[0], actual); // Request 3 + client.put(jobId[1], testKey[1], actual, 32, 256); // Request 4 + + // Close the client and connect to the other server + client.close(); + client = new BlobClient(serverAddress[1]); + + // Verify request 1 + try (InputStream is = client.get(keys[0])) { + byte[] expected =
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944772#comment-14944772 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41242183 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobCache; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class BlobLibraryCacheRecoveryITCase { + + private File recoveryDir; + + @Before + public void setUp() throws Exception { + recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); + if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { + throw new IllegalStateException("Failed to create temp directory for test"); + } + } + + @After + public void cleanUp() throws Exception { + if (recoveryDir != null) { + FileUtils.deleteDirectory(recoveryDir); + } + } + + /** +* Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any +* participating BlobLibraryCacheManager. +*/ + @Test + public void testRecoveryRegisterAndDownload() throws Exception { + Random rand = new Random(); + + BlobServer[] server = new BlobServer[2]; + InetSocketAddress[] serverAddress = new InetSocketAddress[2]; + BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2]; + BlobCache cache = null; + BlobLibraryCacheManager libCache = null; + + try { + Configuration config = new Configuration(); + config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); + + for (int i = 0; i < server.length; i++) { + server[i] = new BlobServer(config); + serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); + libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000); + } + + // Random data + byte[] actual = new byte[1024]; --- End diff -- Same here with `expected` and `actual` confusion. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager,
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944794#comment-14944794 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1227#issuecomment-145799064 What happens with the jars on HDFS if the last `BlobServer` dies without properly calling `shutdown`? They will only be removed if a new Flink cluster is started with the same `STATE_BACKEND_FS_RECOVERY_PATH` and shut down properly? > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944827#comment-14944827 ] ASF GitHub Bot commented on FLINK-2805: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41246346 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -70,18 +82,65 @@ private final int maxConnections; /** +* Shutdown hook thread to ensure deletion of the storage directory (or null if +* {@link RecoveryMode#STANDALONE}) +*/ + private final Thread shutdownHook; + + /** * Instantiates a new BLOB server and binds it to a free network port. * * @throws IOException * thrown if the BLOB server cannot bind to a free network port */ public BlobServer(Configuration config) throws IOException { + checkNotNull(config, "Configuration"); + + this.recoveryMode = RecoveryMode.fromConfig(config); // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); + if (recoveryMode == RecoveryMode.STANDALONE) { + recoveryBasePath = null; + } + else { + // Initialize file state backend for recovery + String stateBackend = config.getString(ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND); + + if (!stateBackend.toLowerCase().equals("filesystem")) { + throw new IllegalConfigurationException(String.format("Illegal state backend " + + "configuration '%s'. Please configure 'FILESYSTEM' as state " + + "backend and specify the recovery path via '%s' key.", + stateBackend, ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } --- End diff -- Yes, same as the job recovery code. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944782#comment-14944782 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41243069 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -102,8 +161,13 @@ public BlobServer(Configuration config) throws IOException { backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; } - // Add shutdown hook to delete storage directory - this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + if (recoveryMode == RecoveryMode.STANDALONE) { --- End diff -- I assume that `==` is correct, but then the JavaDoc is wrong. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944815#comment-14944815 ] ASF GitHub Bot commented on FLINK-2805: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41245655 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -102,8 +161,13 @@ public BlobServer(Configuration config) throws IOException { backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; } - // Add shutdown hook to delete storage directory - this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + if (recoveryMode == RecoveryMode.STANDALONE) { --- End diff -- Yes, sorry the Javadoc is wrong > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944825#comment-14944825 ] ASF GitHub Bot commented on FLINK-2805: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1227#issuecomment-145808965 Exactly. If the blob server dies, then the job manager will still shutdown the cache service though. This is the same behaviour as in the job manager recovery. Thinking about it, I am wondering myself what we gain by removing the shutdown hook. In case of hard failures (or kill -9) the shutdown hook is not called anyways and for all other cases the job manager will shutdown the components manually anyways. Am I missing something? > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944826#comment-14944826 ] ASF GitHub Bot commented on FLINK-2805: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41246319 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class BlobRecoveryITCase { + + private File recoveryDir; + + @Before + public void setUp() throws Exception { + recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir"); + if (!recoveryDir.exists() && !recoveryDir.mkdirs()) { + throw new IllegalStateException("Failed to create temp directory for test"); + } + } + + @After + public void cleanUp() throws Exception { + if (recoveryDir != null) { + FileUtils.deleteDirectory(recoveryDir); + } + } + + /** +* Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any +* participating BlobServer. +*/ + @Test + public void testBlobServerRecovery() throws Exception { + Random rand = new Random(); + + BlobServer[] server = new BlobServer[2]; + InetSocketAddress[] serverAddress = new InetSocketAddress[2]; + BlobClient client = null; + + try { + Configuration config = new Configuration(); + config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); + + for (int i = 0; i < server.length; i++) { + server[i] = new BlobServer(config); + serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); + } + + client = new BlobClient(serverAddress[0]); + + // Random data + byte[] actual = new byte[1024]; + rand.nextBytes(actual); + + BlobKey[] keys = new BlobKey[2]; + + // Put data + keys[0] = client.put(actual); // Request 1 + keys[1] = client.put(actual, 32, 256); // Request 2 + + JobID[] jobId = new JobID[] { new JobID(), new JobID() }; + String[] testKey = new String[] { "test-key-1", "test-key-2" }; + + client.put(jobId[0], testKey[0], actual); // Request 3 + client.put(jobId[1], testKey[1], actual, 32, 256); // Request 4 + + // Close the client and connect to the other server + client.close(); + client = new BlobClient(serverAddress[1]); + + // Verify request 1 + try (InputStream is = client.get(keys[0])) { + byte[] expected = new
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944835#comment-14944835 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1227#issuecomment-145810928 Good PR @uce. I think the proposal in your description to let the client-side (TMs) 1. check whether they have the file cached, 2. check the filesystem backend and 3. (this case should then never happen in recovery mode) ask the JM, in order to obtain the required Jars would be a good improvement. Currently, I've got the feeling that the file state backend is too tightly coupled with the `BlobServer`. IMHO, it would be better to add an abstraction so that the effective backend to distribute the Jars can be easily swapped. Furthermore, I couldn't find a check whether the user provided `STATE_BACKEND_FS_RECOVERY_PATH` path points actually to a distributed file system and is, thus, accessible by the TMs. Maybe we could add a check which, if false, will give a comprehensive warning. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944845#comment-14944845 ] ASF GitHub Bot commented on FLINK-2805: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1227#issuecomment-145812243 I think it makes a difference if the JVM is killed with SIGTERM or SIGKILL. In the former case, the shutdown hooks will be executed. I might be that Yarn first sends a SIGTERM signal before SIGKILL in case of a JVM termination which is not intended. But maybe @rmetzger can chime in here. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945093#comment-14945093 ] ASF GitHub Bot commented on FLINK-2805: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1227#discussion_r41269987 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -70,18 +82,65 @@ private final int maxConnections; /** +* Shutdown hook thread to ensure deletion of the storage directory (or null if +* {@link RecoveryMode#STANDALONE}) +*/ + private final Thread shutdownHook; + + /** * Instantiates a new BLOB server and binds it to a free network port. * * @throws IOException * thrown if the BLOB server cannot bind to a free network port */ public BlobServer(Configuration config) throws IOException { + checkNotNull(config, "Configuration"); + + this.recoveryMode = RecoveryMode.fromConfig(config); // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); + if (recoveryMode == RecoveryMode.STANDALONE) { + recoveryBasePath = null; + } + else { + // Initialize file state backend for recovery + String stateBackend = config.getString(ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND); + + if (!stateBackend.toLowerCase().equals("filesystem")) { + throw new IllegalConfigurationException(String.format("Illegal state backend " + + "configuration '%s'. Please configure 'FILESYSTEM' as state " + + "backend and specify the recovery path via '%s' key.", + stateBackend, ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } --- End diff -- State backend enum is part of the streaming module, which is not a dependency of the runtime atm. I will coordinate with @aljoscha and check what a good place for it is with the streaming rewrite going on. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945116#comment-14945116 ] ASF GitHub Bot commented on FLINK-2805: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1227#issuecomment-145876037 - It is very tightly coupled in in case of recovery. We won't loose this, but I agree to introduce an interface to hide this (with a no-op implementation for standalone mode and the filesystem implementation in case of recovery). I fully agree that it's not a good way to do it, but this is blocking #1153. Is that OK? - There is no check for the provided path. It's the same behaviour as the configured checkpoint directory `STATE_BACKEND_FS_DIR`. I think the check would have to be performed on the task managers. A check for "file://" does not suffice, because this can also be a DFS. I would open a issue for this and then fix it for the `STATE_BACKEND_FS_DIR` as well. - I'm still undecided about the shutdown hook. It was removed to exactly prevent SIGTERM from removing all recovery data, but the actor postStop will be called after SIGTERM as well and then it will remove it anyways. I think the only advantage (which doesn't justify it) is for tests. > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover
[ https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943534#comment-14943534 ] ASF GitHub Bot commented on FLINK-2805: --- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/1227 [FLINK-2805] [blobmanager] Write JARs to file state backend for recovery This is a follow up to #1153. I've taken two changes from #1153 for convenience. Other than that, this is independent. When running the `BlobServer` in `RecoveryMode#ZOOKEEPER`, this will upload the JARs to the configured file system backend (e.g. HDFS). **Important**: it introduces a hard dependency to have a configured file state backend when running the blob server with recovery. This is in line with #1153. This JAR copying only happens on the server side, e.g. the client uploads to the server and the server uploads it to the state backend. Same when requesting a locally non-existing blob: the client requests from the server and the server downloads if not available and then answers the client. There are other ways to implement this, but this one was minimally invasive and fully circumvents any Akka actor threads for downloading/uploading. A more invasive change could allow to directly interact with the state backend on the client side [task manager] as well. This would spread the load better across the cluster in case of a DFS and save unnecessary network transfers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink hdfs_jars-2805 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1227.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1227 commit 68a7ac4abab15dc2ef45546de6a94c27129534dc Author: Ufuk CelebiDate: 2015-10-05T12:30:46Z [FLINK-2805] Apply RecoveryMode and ConfigConstants changes from #1153 commit 00ea6b02f15f00486c9541af08b8197c41dd94f7 Author: Ufuk Celebi Date: 2015-10-05T08:05:05Z [FLINK-2805] [blobmanager] Write JARs to file state backend for recovery > Make user jars available for all job managers to recover > > > Key: FLINK-2805 > URL: https://issues.apache.org/jira/browse/FLINK-2805 > Project: Flink > Issue Type: Bug > Components: BlobManager, JobManager >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > > This is a bug in https://github.com/apache/flink/pull/1153. > In case of multiple job managers, the user jars need to be accessible by all > job managers (including those who arrive later). > Since #1153 requires the file state backend to be configured, the simplest > solution is to make the blob server aware of the configured recovery mode and > put/get/delete the user jars from the file state backend as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)