[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14964747#comment-14964747
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1227


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14950933#comment-14950933
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1227#issuecomment-146954580
  
Looks good. I will rebase it on the latest state recovery branch and if 
travis gives green light, then I'll merge it.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945105#comment-14945105
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41270908
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -70,18 +82,65 @@
private final int maxConnections;
 
/**
+* Shutdown hook thread to ensure deletion of the storage directory (or 
null if
+* {@link RecoveryMode#STANDALONE})
+*/
+   private final Thread shutdownHook;
+
+   /**
 * Instantiates a new BLOB server and binds it to a free network port.
 * 
 * @throws IOException
 * thrown if the BLOB server cannot bind to a free network port
 */
public BlobServer(Configuration config) throws IOException {
+   checkNotNull(config, "Configuration");
+
+   this.recoveryMode = RecoveryMode.fromConfig(config);
 
// configure and create the storage directory
String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
+   if (recoveryMode == RecoveryMode.STANDALONE) {
+   recoveryBasePath = null;
+   }
+   else {
+   // Initialize file state backend for recovery
+   String stateBackend = 
config.getString(ConfigConstants.STATE_BACKEND,
+   ConfigConstants.DEFAULT_STATE_BACKEND);
+
+   if (!stateBackend.toLowerCase().equals("filesystem")) {
+   throw new 
IllegalConfigurationException(String.format("Illegal state backend " +
+   "configuration 
'%s'. Please configure 'FILESYSTEM' as state " +
+   "backend and 
specify the recovery path via '%s' key.",
+   stateBackend, 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+   }
--- End diff --

You could do it similarly to the `RecoveryMode` enum.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944727#comment-14944727
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41238986
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -70,18 +82,65 @@
private final int maxConnections;
 
/**
+* Shutdown hook thread to ensure deletion of the storage directory (or 
null if
+* {@link RecoveryMode#STANDALONE})
--- End diff --

According to the code, this field is not null if `RecoverMode.STANDALONE`


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944722#comment-14944722
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41238757
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -70,18 +82,65 @@
private final int maxConnections;
 
/**
+* Shutdown hook thread to ensure deletion of the storage directory (or 
null if
+* {@link RecoveryMode#STANDALONE})
+*/
+   private final Thread shutdownHook;
+
+   /**
 * Instantiates a new BLOB server and binds it to a free network port.
 * 
 * @throws IOException
 * thrown if the BLOB server cannot bind to a free network port
 */
public BlobServer(Configuration config) throws IOException {
+   checkNotNull(config, "Configuration");
+
+   this.recoveryMode = RecoveryMode.fromConfig(config);
 
// configure and create the storage directory
String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
+   if (recoveryMode == RecoveryMode.STANDALONE) {
+   recoveryBasePath = null;
+   }
+   else {
+   // Initialize file state backend for recovery
+   String stateBackend = 
config.getString(ConfigConstants.STATE_BACKEND,
+   ConfigConstants.DEFAULT_STATE_BACKEND);
+
+   if (!stateBackend.toLowerCase().equals("filesystem")) {
+   throw new 
IllegalConfigurationException(String.format("Illegal state backend " +
+   "configuration 
'%s'. Please configure 'FILESYSTEM' as state " +
+   "backend and 
specify the recovery path via '%s' key.",
+   stateBackend, 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+   }
--- End diff --

Would be imo better to use a `StateBackend` enum value here instead of 
string equals.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944729#comment-14944729
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41239095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -102,8 +161,13 @@ public BlobServer(Configuration config) throws 
IOException {
backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
}
 
-   // Add shutdown hook to delete storage directory
-   this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+   if (recoveryMode == RecoveryMode.STANDALONE) {
--- End diff --

I assume this should be a `!=`?


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944769#comment-14944769
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41241923
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobRecoveryITCase {
+
+   private File recoveryDir;
+
+   @Before
+   public void setUp() throws Exception {
+   recoveryDir = new File(FileUtils.getTempDirectory(), 
"BlobRecoveryITCaseDir");
+   if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+   throw new IllegalStateException("Failed to create temp 
directory for test");
+   }
+   }
+
+   @After
+   public void cleanUp() throws Exception {
+   if (recoveryDir != null) {
+   FileUtils.deleteDirectory(recoveryDir);
+   }
+   }
+
+   /**
+* Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are 
recoverable from any
+* participating BlobServer.
+*/
+   @Test
+   public void testBlobServerRecovery() throws Exception {
+   Random rand = new Random();
+
+   BlobServer[] server = new BlobServer[2];
+   InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+   BlobClient client = null;
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
+   config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
+   
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
recoveryDir.getPath());
+
+   for (int i = 0; i < server.length; i++) {
+   server[i] = new BlobServer(config);
+   serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
+   }
+
+   client = new BlobClient(serverAddress[0]);
+
+   // Random data
+   byte[] actual = new byte[1024];
+   rand.nextBytes(actual);
+
+   BlobKey[] keys = new BlobKey[2];
+
+   // Put data
+   keys[0] = client.put(actual); // Request 1
+   keys[1] = client.put(actual, 32, 256); // Request 2
+
+   JobID[] jobId = new JobID[] { new JobID(), new JobID() 
};
+   String[] testKey = new String[] { "test-key-1", 
"test-key-2" };
+
+   client.put(jobId[0], testKey[0], actual); // Request 3
+   client.put(jobId[1], testKey[1], actual, 32, 256); // 
Request 4
+
+   // Close the client and connect to the other server
+   client.close();
+   client = new BlobClient(serverAddress[1]);
+
+   // Verify request 1
+   try (InputStream is = client.get(keys[0])) {
+   byte[] expected = 

[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944772#comment-14944772
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41242183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobLibraryCacheRecoveryITCase {
+
+   private File recoveryDir;
+
+   @Before
+   public void setUp() throws Exception {
+   recoveryDir = new File(FileUtils.getTempDirectory(), 
"BlobRecoveryITCaseDir");
+   if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+   throw new IllegalStateException("Failed to create temp 
directory for test");
+   }
+   }
+
+   @After
+   public void cleanUp() throws Exception {
+   if (recoveryDir != null) {
+   FileUtils.deleteDirectory(recoveryDir);
+   }
+   }
+
+   /**
+* Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are 
recoverable from any
+* participating BlobLibraryCacheManager.
+*/
+   @Test
+   public void testRecoveryRegisterAndDownload() throws Exception {
+   Random rand = new Random();
+
+   BlobServer[] server = new BlobServer[2];
+   InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+   BlobLibraryCacheManager[] libServer = new 
BlobLibraryCacheManager[2];
+   BlobCache cache = null;
+   BlobLibraryCacheManager libCache = null;
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
+   config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
+   
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
recoveryDir.getPath());
+
+   for (int i = 0; i < server.length; i++) {
+   server[i] = new BlobServer(config);
+   serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
+   libServer[i] = new 
BlobLibraryCacheManager(server[i], 3600 * 1000);
+   }
+
+   // Random data
+   byte[] actual = new byte[1024];
--- End diff --

Same here with `expected` and `actual` confusion.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, 

[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944794#comment-14944794
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1227#issuecomment-145799064
  
What happens with the jars on HDFS if the last `BlobServer` dies without 
properly calling `shutdown`? They will only be removed if a new Flink cluster 
is started with the same `STATE_BACKEND_FS_RECOVERY_PATH` and shut down 
properly?


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944827#comment-14944827
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41246346
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -70,18 +82,65 @@
private final int maxConnections;
 
/**
+* Shutdown hook thread to ensure deletion of the storage directory (or 
null if
+* {@link RecoveryMode#STANDALONE})
+*/
+   private final Thread shutdownHook;
+
+   /**
 * Instantiates a new BLOB server and binds it to a free network port.
 * 
 * @throws IOException
 * thrown if the BLOB server cannot bind to a free network port
 */
public BlobServer(Configuration config) throws IOException {
+   checkNotNull(config, "Configuration");
+
+   this.recoveryMode = RecoveryMode.fromConfig(config);
 
// configure and create the storage directory
String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
+   if (recoveryMode == RecoveryMode.STANDALONE) {
+   recoveryBasePath = null;
+   }
+   else {
+   // Initialize file state backend for recovery
+   String stateBackend = 
config.getString(ConfigConstants.STATE_BACKEND,
+   ConfigConstants.DEFAULT_STATE_BACKEND);
+
+   if (!stateBackend.toLowerCase().equals("filesystem")) {
+   throw new 
IllegalConfigurationException(String.format("Illegal state backend " +
+   "configuration 
'%s'. Please configure 'FILESYSTEM' as state " +
+   "backend and 
specify the recovery path via '%s' key.",
+   stateBackend, 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+   }
--- End diff --

Yes, same as the job recovery code.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944782#comment-14944782
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41243069
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -102,8 +161,13 @@ public BlobServer(Configuration config) throws 
IOException {
backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
}
 
-   // Add shutdown hook to delete storage directory
-   this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+   if (recoveryMode == RecoveryMode.STANDALONE) {
--- End diff --

I assume that `==` is correct, but then the JavaDoc is wrong.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944815#comment-14944815
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41245655
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -102,8 +161,13 @@ public BlobServer(Configuration config) throws 
IOException {
backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
}
 
-   // Add shutdown hook to delete storage directory
-   this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+   if (recoveryMode == RecoveryMode.STANDALONE) {
--- End diff --

Yes, sorry the Javadoc is wrong


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944825#comment-14944825
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1227#issuecomment-145808965
  
Exactly. If the blob server dies, then the job manager will still shutdown 
the cache service though. This is the same behaviour as in the job manager 
recovery. Thinking about it, I am wondering myself what we gain by removing the 
shutdown hook. In case of hard failures (or kill -9) the shutdown hook is not 
called anyways and for all other cases the job manager will shutdown the 
components manually anyways. Am I missing something?


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944826#comment-14944826
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41246319
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobRecoveryITCase {
+
+   private File recoveryDir;
+
+   @Before
+   public void setUp() throws Exception {
+   recoveryDir = new File(FileUtils.getTempDirectory(), 
"BlobRecoveryITCaseDir");
+   if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+   throw new IllegalStateException("Failed to create temp 
directory for test");
+   }
+   }
+
+   @After
+   public void cleanUp() throws Exception {
+   if (recoveryDir != null) {
+   FileUtils.deleteDirectory(recoveryDir);
+   }
+   }
+
+   /**
+* Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are 
recoverable from any
+* participating BlobServer.
+*/
+   @Test
+   public void testBlobServerRecovery() throws Exception {
+   Random rand = new Random();
+
+   BlobServer[] server = new BlobServer[2];
+   InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+   BlobClient client = null;
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
+   config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
+   
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
recoveryDir.getPath());
+
+   for (int i = 0; i < server.length; i++) {
+   server[i] = new BlobServer(config);
+   serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
+   }
+
+   client = new BlobClient(serverAddress[0]);
+
+   // Random data
+   byte[] actual = new byte[1024];
+   rand.nextBytes(actual);
+
+   BlobKey[] keys = new BlobKey[2];
+
+   // Put data
+   keys[0] = client.put(actual); // Request 1
+   keys[1] = client.put(actual, 32, 256); // Request 2
+
+   JobID[] jobId = new JobID[] { new JobID(), new JobID() 
};
+   String[] testKey = new String[] { "test-key-1", 
"test-key-2" };
+
+   client.put(jobId[0], testKey[0], actual); // Request 3
+   client.put(jobId[1], testKey[1], actual, 32, 256); // 
Request 4
+
+   // Close the client and connect to the other server
+   client.close();
+   client = new BlobClient(serverAddress[1]);
+
+   // Verify request 1
+   try (InputStream is = client.get(keys[0])) {
+   byte[] expected = new 

[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944835#comment-14944835
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1227#issuecomment-145810928
  
Good PR @uce. I think the proposal in your description to let the 
client-side (TMs) 1. check whether they have the file cached, 2. check the 
filesystem backend and 3. (this case should then never happen in recovery mode) 
ask the JM, in order to obtain the required Jars would be a good improvement.

Currently, I've got the feeling that the file state backend is too tightly 
coupled with the `BlobServer`. IMHO, it would be better to add an abstraction 
so that the effective backend to distribute the Jars can be easily swapped. 

Furthermore, I couldn't find a check whether the user provided 
`STATE_BACKEND_FS_RECOVERY_PATH` path points actually to a distributed file 
system and is, thus, accessible by the TMs. Maybe we could add a check which, 
if false, will give a comprehensive warning.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944845#comment-14944845
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1227#issuecomment-145812243
  
I think it makes a difference if the JVM is killed with SIGTERM or SIGKILL. 
In the former case, the shutdown hooks will be executed. I might be that Yarn 
first sends a SIGTERM signal before SIGKILL in case of a JVM termination which 
is not intended. But maybe @rmetzger can chime in here.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945093#comment-14945093
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1227#discussion_r41269987
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -70,18 +82,65 @@
private final int maxConnections;
 
/**
+* Shutdown hook thread to ensure deletion of the storage directory (or 
null if
+* {@link RecoveryMode#STANDALONE})
+*/
+   private final Thread shutdownHook;
+
+   /**
 * Instantiates a new BLOB server and binds it to a free network port.
 * 
 * @throws IOException
 * thrown if the BLOB server cannot bind to a free network port
 */
public BlobServer(Configuration config) throws IOException {
+   checkNotNull(config, "Configuration");
+
+   this.recoveryMode = RecoveryMode.fromConfig(config);
 
// configure and create the storage directory
String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
+   if (recoveryMode == RecoveryMode.STANDALONE) {
+   recoveryBasePath = null;
+   }
+   else {
+   // Initialize file state backend for recovery
+   String stateBackend = 
config.getString(ConfigConstants.STATE_BACKEND,
+   ConfigConstants.DEFAULT_STATE_BACKEND);
+
+   if (!stateBackend.toLowerCase().equals("filesystem")) {
+   throw new 
IllegalConfigurationException(String.format("Illegal state backend " +
+   "configuration 
'%s'. Please configure 'FILESYSTEM' as state " +
+   "backend and 
specify the recovery path via '%s' key.",
+   stateBackend, 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+   }
--- End diff --

State backend enum is part of the streaming module, which is not a 
dependency of the runtime atm. I will coordinate with @aljoscha and check what 
a good place for it is with the streaming rewrite going on.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14945116#comment-14945116
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1227#issuecomment-145876037
  
- It is very tightly coupled in in case of recovery. We won't loose this, 
but I agree to introduce an interface to hide this (with a no-op implementation 
for standalone mode and the filesystem implementation in case of recovery). I 
fully agree that it's not a good way to do it, but this is blocking #1153. Is 
that OK?

- There is no check for the provided path. It's the same behaviour as the 
configured checkpoint directory `STATE_BACKEND_FS_DIR`. I think the check would 
have to be performed on the task managers. A check for "file://" does not 
suffice, because this can also be a DFS. I would open a issue for this and then 
fix it for the `STATE_BACKEND_FS_DIR` as well.

- I'm still undecided about the shutdown hook. It was removed to exactly 
prevent SIGTERM from removing all recovery data, but the actor postStop will be 
called after SIGTERM as well and then it will remove it anyways. I think the 
only advantage (which doesn't justify it) is for tests.


> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2805) Make user jars available for all job managers to recover

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943534#comment-14943534
 ] 

ASF GitHub Bot commented on FLINK-2805:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/1227

[FLINK-2805] [blobmanager] Write JARs to file state backend for recovery

This is a follow up to #1153. I've taken two changes from #1153 for 
convenience. Other than that, this is independent.

When running the `BlobServer` in `RecoveryMode#ZOOKEEPER`, this will upload 
the JARs to the configured file system backend (e.g. HDFS). **Important**: it 
introduces a hard dependency to have a configured file state backend when 
running the blob server with recovery. This is in line with #1153.

This JAR copying only happens on the server side, e.g. the client uploads 
to the server and the server uploads it to the state backend. Same when 
requesting a locally non-existing blob: the client requests from the server and 
the server downloads if not available and then answers the client.

There are other ways to implement this, but this one was minimally invasive 
and fully circumvents any Akka actor threads for downloading/uploading. A more 
invasive change could allow to directly interact with the state backend on the 
client side [task manager] as well. This would spread the load better across 
the cluster in case of a DFS and save unnecessary network transfers.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink hdfs_jars-2805

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1227


commit 68a7ac4abab15dc2ef45546de6a94c27129534dc
Author: Ufuk Celebi 
Date:   2015-10-05T12:30:46Z

[FLINK-2805] Apply RecoveryMode and ConfigConstants changes from #1153

commit 00ea6b02f15f00486c9541af08b8197c41dd94f7
Author: Ufuk Celebi 
Date:   2015-10-05T08:05:05Z

[FLINK-2805] [blobmanager] Write JARs to file state backend for recovery




> Make user jars available for all job managers to recover
> 
>
> Key: FLINK-2805
> URL: https://issues.apache.org/jira/browse/FLINK-2805
> Project: Flink
>  Issue Type: Bug
>  Components: BlobManager, JobManager
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> This is a bug in https://github.com/apache/flink/pull/1153.
> In case of multiple job managers, the user jars need to be accessible by all 
> job managers (including those who arrive later).
> Since #1153 requires the file state backend to be configured, the simplest 
> solution is to make the blob server aware of the configured recovery mode and 
> put/get/delete the user jars from the file state backend as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)