[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-05-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5580


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-05-02 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r185517079
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 
ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+   final ZipEntry zipEntry = new 
ZipEntry("cacheFile");
+   zis.putNextEntry(zipEntry);
+
+   

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r183700946
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -488,13 +494,36 @@ public void addJar(Path jar) {
return userJars;
}
 
+   /**
+* Adds the path of a custom file required to run the job on a task 
manager.
+*
+* @param file
--- End diff --

missing parameter docs for `name`


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r183703858
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 
ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+   final ZipEntry zipEntry = new 
ZipEntry("cacheFile");
+   zis.putNextEntry(zipEntry);
+
+ 

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r183702885
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 
ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+   final ZipEntry zipEntry = new 
ZipEntry("cacheFile");
+   zis.putNextEntry(zipEntry);
+
+ 

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179123350
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -164,16 +165,13 @@ public void testDirectoryCleanUp() throws Exception {
assertTrue(fs.exists(cacheFile));
 
fileCache.releaseJob(jobID, attemptID2);
-   // still should be available, file will be deleted after 5 
seconds
+   // still should be available, file will be deleted after 200 
milliseconds
assertTrue(fileStatus.isDir());
assertTrue(fs.exists(cacheFile));
--- End diff --

what you _could_ also do is pass in a `ScheduledExecutorService`. You could 
then intercept the delete process, verify the interval arguments and fire the 
process at your leisure.

I'm worried that in it's current form the test is either not stable or 
takes longer than it should.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179116502
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
);
}
 
+   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+   checkNotNull(jobId);
+
+   synchronized (lock) {
+   Set jobRefCounter = 
jobRefHolders.get(jobId);
+
+   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
+   return;
+   }
+
+   jobRefCounter.remove(executionId);
--- End diff --

You are right! I added removing it in the DeleteProcess.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179116344
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 
ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+   final ZipEntry zipEntry = new 
ZipEntry("cacheFile");
+   zis.putNextEntry(zipEntry);
+
+   IOUtils.copyBytes(new 
ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, 
false);
+   }
+   return zipArchive;
  

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179087145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

ah yes, i was misreading the test...
´


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179079963
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

it does, it is `FileCache` responsibility to unzip.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179075409
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

as far the filecache is concerned it doesn't follow a different path 
though, right?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179071066
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

It is different in the way that this test uploads a single file. Therefore 
it follows a bit different path. The file is not zipped and is directly shipped 
from BlobCache. In case of directories there is unzipped version which 
lifecycle is managed by `FileCache`.

If you think though the test 
`FileCacheDirectoryTest#testDirectoryDownloadedFromBlob` is enough, I will 
remove it.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179050736
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
 ---
@@ -37,18 +39,31 @@
  */
 @Public
 public class DistributedCache {
-   
-   public static class DistributedCacheEntry {
-   
+
+   public static class DistributedCacheEntry implements Serializable {
+
public String filePath;
public Boolean isExecutable;
-   
-   public DistributedCacheEntry(String filePath, Boolean 
isExecutable){
+   public Boolean isZipped;
--- End diff --

use a primitive instead?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179060408
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 
ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+   final ZipEntry zipEntry = new 
ZipEntry("cacheFile");
+   zis.putNextEntry(zipEntry);
+
+   IOUtils.copyBytes(new 
ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, 
false);
+   }
+   return zipArchive;

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179056079
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
);
}
 
+   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+   checkNotNull(jobId);
+
+   synchronized (lock) {
+   Set jobRefCounter = 
jobRefHolders.get(jobId);
+
+   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
+   return;
+   }
+
+   jobRefCounter.remove(executionId);
+   if (jobRefCounter.isEmpty()) {
+   executorService.schedule(new 
DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
+   }
+   }
+   }
+
// 

//  background processes
// 

 
/**
-* Asynchronous file copy process.
+* Asynchronous file copy process from blob server.
 */
-   private static class CopyProcess implements Callable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean isDirectory;
+   private final boolean isExecutable;
+   private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.isExecutable = e.isExecutable;
+   this.isDirectory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
--- End diff --

the constructor signature should be modified instead


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179049139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -231,6 +234,7 @@ public TaskExecutor(
 
hardwareDescription = HardwareDescription.extractFromSystem(

taskExecutorServices.getMemoryManager().getMemorySize());
+
--- End diff --

revert


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179048742
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
@@ -26,6 +26,7 @@
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.cache.DistributedCache;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
--- End diff --

this import should now be unnecessary. or the change should be reverted


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179053003
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
+
+   final File tempFile = File.createTempFile("temp", null, new 
File(tempDir));
+   Files.write(tempFile.toPath(), singletonList(fileContent));
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.registerCachedFile(tempFile.getPath(), "test_data", false);
+
+   env.fromElements(1)
+   .map(new TestMapFunction(tempFile.getAbsolutePath()))
+   .writeAsText(params.getRequired("output"), 
FileSystem.WriteMode.OVERWRITE);
+
+   env.execute("Distributed Cache Via Blob Test Program");
+   }
+
+   static class TestMapFunction extends RichMapFunction {
+
+   private String initialPath;
+
+   public TestMapFunction(String initialPath) {
+   this.initialPath = initialPath;
+   }
+
+   @Override
+   public String map(Integer value) throws Exception {
+   Files.deleteIfExists(Paths.get(initialPath));
+   return StringUtils.join(Files
+   
.readAllLines(getRuntimeContext().getDistributedCache().getFile("test_data").toPath()),
 "\n");
--- End diff --

this could be replaced with
```
return 
Files.readAllLines(getRuntimeContext().getDistributedCache().getFile("test_data").toPath()).stream()
.collect(Collectors.joining("\n"));
```


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179057582
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -631,14 +631,14 @@ else if (current == ExecutionState.CANCELING) {

DistributedCache.readFileInfoFromConfig(jobConfiguration))
{
LOG.info("Obtaining local cache file 
for '{}'.", entry.getKey());
-   Future cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
+   Future cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);

distributedCacheEntries.put(entry.getKey(), cp);
}
}
catch (Exception e) {
throw new Exception(
-   String.format("Exception while adding 
files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
--- End diff --

revert


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179053531
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
+
+   final File tempFile = File.createTempFile("temp", null, new 
File(tempDir));
+   Files.write(tempFile.toPath(), singletonList(fileContent));
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
--- End diff --

explicitly set the parallelism to `1`?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179056758
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
);
}
 
+   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+   checkNotNull(jobId);
+
+   synchronized (lock) {
+   Set jobRefCounter = 
jobRefHolders.get(jobId);
+
+   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
+   return;
+   }
+
+   jobRefCounter.remove(executionId);
--- End diff --

shouldn't we also remove the entries? Otherwise the `entries` map will 
continue to grow until either the TM shuts down or the or crashes with an OOM 
error.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179054280
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
+
+   final File tempFile = File.createTempFile("temp", null, new 
File(tempDir));
--- End diff --

use a random name instead, in case the tempDir isn't randomized.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179052173
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml ---
@@ -0,0 +1,106 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   ..
+   
+
+   4.0.0
+
+   
flink-distributed-cache-via-blob-test_${scala.binary.version}
+   flink-distributed-cache-via-blob
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   2.4
+
+   
+   
+   
+   ClassLoaderTestProgram
+   package
+   
+   jar
+   
+   
+   
DistributedCacheViaBlobTestProgram
+
+   
+   

+   
org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram
+   

+   
+
+   
+   
org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram*
+   
+   
+   
+   
+   
+
+   
+   
+   org.apache.maven.plugins
+   maven-antrun-plugin
--- End diff --

this can be removed if you add 
`DataSetAllroundTestProgram` to the `maven-jar-plugin` 
configuration, see the `flink-dataset-allround-test` module.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179050544
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -331,7 +332,6 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
(JobGraph jobGraphToSubmit) -> {
log.info("Submitting job graph.");
-
--- End diff --

revert


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179054489
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
--- End diff --

I'm wondering whether we shouldn't just pass in a file instead.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179053801
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
+
+   final File tempFile = File.createTempFile("temp", null, new 
File(tempDir));
+   Files.write(tempFile.toPath(), singletonList(fileContent));
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.registerCachedFile(tempFile.getPath(), "test_data", false);
+
+   env.fromElements(1)
+   .map(new TestMapFunction(tempFile.getAbsolutePath()))
+   .writeAsText(params.getRequired("output"), 
FileSystem.WriteMode.OVERWRITE);
+
+   env.execute("Distributed Cache Via Blob Test Program");
+   }
+
+   static class TestMapFunction extends RichMapFunction {
+
+   private String initialPath;
+
+   public TestMapFunction(String initialPath) {
+   this.initialPath = initialPath;
+   }
+
+   @Override
+   public String map(Integer value) throws Exception {
+   Files.deleteIfExists(Paths.get(initialPath));
--- End diff --

this seems icky. could we not compare the paths instead to make sure we're 
accessing a different file?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179060824
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

am i missing something, or isn't this now a duplicate of 
`FileCacheDirectoryTest#testDirectoryDownloadedFromBlob`


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179057166
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
);
}
 
+   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+   checkNotNull(jobId);
+
+   synchronized (lock) {
+   Set jobRefCounter = 
jobRefHolders.get(jobId);
+
+   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
+   return;
+   }
+
+   jobRefCounter.remove(executionId);
+   if (jobRefCounter.isEmpty()) {
+   executorService.schedule(new 
DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
+   }
+   }
+   }
+
// 

//  background processes
// 

 
/**
-* Asynchronous file copy process.
+* Asynchronous file copy process from blob server.
 */
-   private static class CopyProcess implements Callable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean isDirectory;
+   private final boolean isExecutable;
+   private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.isExecutable = e.isExecutable;
+   this.isDirectory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
+   final File file = blobService.getFile(jobID, blobKey);
+
+   if (isDirectory) {
+   try (ZipInputStream zis = new 
ZipInputStream(new FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != 
null) {
+   String fileName = 
entry.getName();
+   Path newFile = new Path(target, 
fileName);
+   if (entry.isDirectory()) {
+   
target.getFileSystem().mkdirs(newFile);
+   } else {
+   try (FSDataOutputStream 
fsDataOutputStream = target.getFileSystem()
+   
.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
+   
IOUtils.copyBytes(zis, fsDataOutputStream, false);
+   }
+   //noinspection 
ResultOfMethodCallIgnored
+   new 
File(newFile.getPath()).setExecutable(isExecutable);
+   }
+  

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-04-04 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179052578
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
--- End diff --

undeclared dependency.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767320
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1831,4 +1831,5 @@ public void registerCachedFile(String filePath, 
String name) {
public void registerCachedFile(String filePath, String name, boolean 
executable) {
this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
}
+
--- End diff --

revert


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767248
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -91,6 +93,7 @@ public void after() throws Exception{
}
 
@Test
+   @Category(Flip6.class)
--- End diff --

revert?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176769129
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -143,30 +160,23 @@ public void shutdown() {
/**
 * If the file doesn't exists locally, it will copy the file to the 
temp directory.
 *
-* @param name  The name under which the file is registered.
 * @param entry The cache entry descriptor (path, executable flag)
 * @param jobID The ID of the job for which the file is copied.
 * @return The handle to the task that copies the file.
 */
-   public Future createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID) {
+   public Future createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID, ExecutionAttemptID executionId) {
synchronized (lock) {
-   Map>> 
jobEntries = entries.get(jobID);
-   if (jobEntries == null) {
-   jobEntries = new HashMap>>();
-   entries.put(jobID, jobEntries);
-   }
+   Map> jobEntries = 
entries.computeIfAbsent(jobID, k -> new HashMap<>());
+   final Set refHolders = 
jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
+   refHolders.add(executionId);
 
// tuple is (ref-count, parent-temp-dir, 
cached-file-path, copy-process)
--- End diff --

outdated


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767271
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
 ---
@@ -40,6 +42,7 @@
 /**
  * Test the distributed cache.
  */
+@Category(Flip6.class)
--- End diff --

revert?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176768205
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

that's an implementation detail though


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176766216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map>> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4> entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
-   // abort the 
copy
-   
entry.f3.cancel(true);
-
-   // remove the 
file
-   File file = new 
File(entry.f2.toString());
-  

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176764920
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map>> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4> entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
-   // abort the 
copy
-   
entry.f3.cancel(true);
-
-   // remove the 
file
-   File file = new 
File(entry.f2.toString());
-  

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176747071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map>> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4> entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
-   // abort the 
copy
-   
entry.f3.cancel(true);
-
-   // remove the 
file
-   File file = new 
File(entry.f2.toString());
-

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176746645
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
+   String fileName = 
entry.getName().replaceAll("/", "");
--- End diff --

It is not sufficient. In this testcase `entry.getName` returns 
`/pre,,,suff`. Note the `/` in the beginning.

Though I've changed `replaceAll` to `replaceFirst`.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176745753
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
 *  Any additional configuration for the blob client
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param jars
-*  List of JAR files to upload
+* @param files
+*  List of files to upload
 *
 * @throws IOException
 *  if the upload fails
 */
-   public static List uploadJarFiles(
-   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List jars)
+   public static List uploadFiles(
+   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List files)
throws IOException {
 
checkNotNull(jobId);
 
-   if (jars.isEmpty()) {
+   if (files.isEmpty()) {
return Collections.emptyList();
} else {
List blobKeys = new ArrayList<>();
 
try (BlobClient blobClient = new 
BlobClient(serverAddress, clientConfig)) {
-   for (final Path jar : jars) {
-   final FileSystem fs = 
jar.getFileSystem();
-   FSDataInputStream is = null;
-   try {
-   is = fs.open(jar);
-   final PermanentBlobKey key =
-   (PermanentBlobKey) 
blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
-   blobKeys.add(key);
-   } finally {
-   if (is != null) {
-   is.close();
-   }
-   }
+   for (final Path file : files) {
+   final PermanentBlobKey key = 
blobClient.uploadFile(jobId, file);
+   blobKeys.add(key);
}
}
 
return blobKeys;
}
}
 
-   // 

-   //  Miscellaneous
-   // 

-
-   private static Throwable readExceptionFromStream(InputStream in) throws 
IOException {
-   int len = readLength(in);
-   byte[] bytes = new byte[len];
-   readFully(in, bytes, 0, len, "Error message");
+   /**
+* Uploads a single file to the {@link PermanentBlobService} of the 
given {@link BlobServer}.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param file
+*  file to upload
+*
+* @throws IOException
+*  if the upload fails
+*/
+   public PermanentBlobKey uploadFile(JobID jobId, Path file) throws 
IOException {
+   final FileSystem fs = file.getFileSystem();
+   if (fs.getFileStatus(file).isDir()) {
+   return uploadDirectory(jobId, file, fs);
+   } else {
+   try (InputStream is = fs.open(file)) {
+   return (PermanentBlobKey) putInputStream(jobId, 
is, PERMANENT_BLOB);
+   }
+   }
+   }
 
-   try {
-   return (Throwable) 
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+   private PermanentBlobKey uploadDirectory(JobID jobId, Path file, 
FileSystem fs) throws IOException {
+   try (BlobOutputStream blobOutputStream = new 
BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+   try (ZipOutputStream zipStream = new 
ZipOutputStream(blobOutputStream)) {
+   compressDirectoryToZipfile(fs, 
fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+   zipStream.finish();
+   return (PermanentBlobKey) 
blobOutputStream.finish();
+   }
}
-   catch (ClassNotFoundException e) {
-   // should never occur
-   throw new 

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176745183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

It does not need to. `getNextEntry` does it, at the very beginning.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176658024
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -306,18 +311,20 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
getDispatcherAddress(),
(BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   log.info("Uploading jar files.");
final int blobServerPort = response.port;
final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
final List keys;
try {
-   keys = 
BlobClient.uploadJarFiles(address, flinkConfig, jobGraph.getJobID(), 
jobGraph.getUserJars());
+   log.info("Uploading jar files.");
+   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
+   log.info("Uploading custom files.");
--- End diff --

don't think this should be logged if there's nothing to upload (which 
generally will be the default)


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662757
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
--- End diff --

we have somewhere in a `FileUtils` a writeUtf8 method


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662599
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

entry should be closed


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176663553
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -57,89 +65,59 @@
+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
-   + "deine Boten, Herr, verehren Das sanfte Wandeln deines 
Tags.\n";
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
 
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
private FileCache fileCache;
-   private File f;
 
-   @Before
-   public void setup() throws IOException {
-   String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
-   try {
-   fileCache = new FileCache(tmpDirectories);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("Cannot create FileCache: " + e.getMessage());
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   File f = temporaryFolder.newFile("cacheFile");
+   FileUtils.writeFileUtf8(f, testFileContent);
+   return f;
+   } else {
+   throw new IllegalArgumentException("This 
service contains only entry for " + permanentBlobKey);
+   }
}
 
-   f = temporaryFolder.newFile("cacheFile");
-   try {
-   Files.write(testFileContent, f, Charsets.UTF_8);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("Error initializing the test: " + e.getMessage());
+   @Override
+   public void close() throws IOException {
+
}
+   };
+
+   @Before
+   public void setup() throws Exception {
+   fileCache = new FileCache(new 
String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
}
 
@After
public void shutdown() {
-   try {
-   fileCache.shutdown();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("FileCache shutdown failed: " + e.getMessage());
-   }
+   fileCache.shutdown();
}
 
@Test
-   public void testFileReuseForNextTask() {
-   try {
-   final JobID jobID = new JobID();
-   final String fileName = "test_file";
-
-   final String filePath = f.toURI().toString();
-
-   // copy / create the file
-   Future copyResult = 
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), 
jobID);
-   copyResult.get();
-
-   // get another reference to the file
-   Future copyResult2 = 
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), 
jobID);
-
-   // this should be available immediately
-   assertTrue(copyResult2.isDone());
-
-   // delete the file
-   fileCache.deleteTmpFile(fileName, jobID);
-   // file should not yet be deleted
-   assertTrue(fileCache.holdsStillReference(fileName, 
jobID));
-
-   // delete the second reference
-   fileCache.deleteTmpFile(fileName, jobID);
-   // file should still not be deleted, but remain for a 
bit
-   assertTrue(fileCache.holdsStillReference(fileName, 
jobID));
-
-   fileCache.createTmpFile(fileName, new 
DistributedCacheEntry(filePath, false), jobID);
-   fileCache.deleteTmpFile(fileName, jobID);
-
-   // after a while, the file should disappear
-   long deadline = System.currentTimeMillis() + 2;
-   do {
-   Thread.sleep(5500);
-   }
-   while (fileCache.holdsStillR

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176658632
  
--- Diff: flink-end-to-end-tests/pom.xml ---
@@ -78,6 +78,27 @@ under the License.



+   
+   
DistributedCacheViaBlobTestProgram
+   package
+   
+   jar
+   
+   
+   
DistributedCacheViaBlobTestProgram
+
+   
+   

+   
org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram
+   

+   
+
+   
+   
org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.class
--- End diff --

This can use wildcards, no? 
`org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram* `


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176660974
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
 *  Any additional configuration for the blob client
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param jars
-*  List of JAR files to upload
+* @param files
+*  List of files to upload
 *
 * @throws IOException
 *  if the upload fails
 */
-   public static List uploadJarFiles(
-   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List jars)
+   public static List uploadFiles(
+   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List files)
throws IOException {
 
checkNotNull(jobId);
 
-   if (jars.isEmpty()) {
+   if (files.isEmpty()) {
return Collections.emptyList();
} else {
List blobKeys = new ArrayList<>();
 
try (BlobClient blobClient = new 
BlobClient(serverAddress, clientConfig)) {
-   for (final Path jar : jars) {
-   final FileSystem fs = 
jar.getFileSystem();
-   FSDataInputStream is = null;
-   try {
-   is = fs.open(jar);
-   final PermanentBlobKey key =
-   (PermanentBlobKey) 
blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
-   blobKeys.add(key);
-   } finally {
-   if (is != null) {
-   is.close();
-   }
-   }
+   for (final Path file : files) {
+   final PermanentBlobKey key = 
blobClient.uploadFile(jobId, file);
+   blobKeys.add(key);
}
}
 
return blobKeys;
}
}
 
-   // 

-   //  Miscellaneous
-   // 

-
-   private static Throwable readExceptionFromStream(InputStream in) throws 
IOException {
-   int len = readLength(in);
-   byte[] bytes = new byte[len];
-   readFully(in, bytes, 0, len, "Error message");
+   /**
+* Uploads a single file to the {@link PermanentBlobService} of the 
given {@link BlobServer}.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param file
+*  file to upload
+*
+* @throws IOException
+*  if the upload fails
+*/
+   public PermanentBlobKey uploadFile(JobID jobId, Path file) throws 
IOException {
+   final FileSystem fs = file.getFileSystem();
+   if (fs.getFileStatus(file).isDir()) {
+   return uploadDirectory(jobId, file, fs);
+   } else {
+   try (InputStream is = fs.open(file)) {
+   return (PermanentBlobKey) putInputStream(jobId, 
is, PERMANENT_BLOB);
+   }
+   }
+   }
 
-   try {
-   return (Throwable) 
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+   private PermanentBlobKey uploadDirectory(JobID jobId, Path file, 
FileSystem fs) throws IOException {
+   try (BlobOutputStream blobOutputStream = new 
BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+   try (ZipOutputStream zipStream = new 
ZipOutputStream(blobOutputStream)) {
+   compressDirectoryToZipfile(fs, 
fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+   zipStream.finish();
+   return (PermanentBlobKey) 
blobOutputStream.finish();
+   }
}
-   catch (ClassNotFoundException e) {
-   // should never occur
-   throw new IO

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176658516
  
--- Diff: flink-end-to-end-tests/pom.xml ---
@@ -78,6 +78,27 @@ under the License.



+   
--- End diff --

this will need a rebase; each tests now has its own module


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176659832
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -488,13 +493,36 @@ public void addJar(Path jar) {
return userJars;
}
 
+   /**
+* Adds the path of a JAR file required to run the job on a task 
manager.
--- End diff --

it may not be a jar file


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176659407
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -159,7 +162,7 @@ public ExecutionConfig getConfig() {
* Get the list of cached files that were registered for distribution 
among the task managers.
*/
public List> 
getCachedFiles() {
-   return cacheFile;
+   return cacheFile.entrySet().stream().map(e -> 
Tuple2.of(e.getKey(), e.getValue())).collect(Collectors.toList());
--- End diff --

if the mapping is never used there's no value in using a map in the first 
place.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662637
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
+   String fileName = 
entry.getName().replaceAll("/", "");
--- End diff --

`Path#getName()`?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662188
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>>> entries,
--- End diff --

who's responsible for cleaning up the created local files?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176658879
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -75,15 +66,12 @@
  */
 @PublicEvolving
 public class PythonStreamExecutionEnvironment {
-   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
--- End diff --

I would leave it even if it isn't used


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176661729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map>> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4> entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
-   // abort the 
copy
-   
entry.f3.cancel(true);
-
-   // remove the 
file
-   File file = new 
File(entry.f2.toString());
-  

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662946
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -19,30 +19,38 @@
 package org.apache.flink.runtime.filecache;
 
 import org.apache.flink.api.common.JobID;
-import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
 import org.apache.flink.shaded.guava18.com.google.common.io.Files;
 
+import org.apache.commons.lang3.StringUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+@RunWith(MockitoJUnitRunner.class)
--- End diff --

Why is this needed now? I don't see any mocking being added.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176661496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
--- End diff --

-> `isDirectory`


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176657746
  
--- Diff: 
flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
 ---
@@ -30,6 +33,7 @@
 /**
  * Tests for the PythonPlanBinder.
  */
+@Category(Flip6.class)
--- End diff --

why does this not run anymore against legacy clusters?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171184258
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, 
String name) {
 * @param executable flag indicating whether the file should be 
executable
 */
public void registerCachedFile(String filePath, String name, boolean 
executable) {
-   this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
+   registerCachedFile(filePath, name, executable, false);
}
+
+   /**
+* Registers a file at the distributed cache under the given name. The 
file will be accessible
+* from any user-defined function in the (distributed) runtime under a 
local path. If upload is true files will
+* be distributed via {@link BlobServer} otherwise Files should be 
local files (as long as all relevant workers
+* have access to it), or files in a distributed file system. The 
runtime will copy the files temporarily to a
+* local cache, if needed.
+*
+* The {@link org.apache.flink.api.common.functions.RuntimeContext} 
can be obtained inside UDFs via
+* {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
+* {@link org.apache.flink.api.common.cache.DistributedCache} via
+* {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+*
+* @param filePath The path of the file
+* @param name The name under which the file is registered.
+* @param executable flag indicating whether the file should be 
executable
+* @param upload flag indicating if the file should be distributed via 
BlobServer
+*/
+   public void registerCachedFile(String filePath, String name, boolean 
executable, boolean upload) {
--- End diff --

send everything through the blob service


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171181348
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, 
String name) {
 * @param executable flag indicating whether the file should be 
executable
 */
public void registerCachedFile(String filePath, String name, boolean 
executable) {
-   this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
+   registerCachedFile(filePath, name, executable, false);
}
+
+   /**
+* Registers a file at the distributed cache under the given name. The 
file will be accessible
+* from any user-defined function in the (distributed) runtime under a 
local path. If upload is true files will
+* be distributed via {@link BlobServer} otherwise Files should be 
local files (as long as all relevant workers
+* have access to it), or files in a distributed file system. The 
runtime will copy the files temporarily to a
+* local cache, if needed.
+*
+* The {@link org.apache.flink.api.common.functions.RuntimeContext} 
can be obtained inside UDFs via
+* {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
+* {@link org.apache.flink.api.common.cache.DistributedCache} via
+* {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+*
+* @param filePath The path of the file
+* @param name The name under which the file is registered.
+* @param executable flag indicating whether the file should be 
executable
+* @param upload flag indicating if the file should be distributed via 
BlobServer
+*/
+   public void registerCachedFile(String filePath, String name, boolean 
executable, boolean upload) {
--- End diff --

What behaviour do you suggest sending all local files through Blob and 
serving files from DFS as before?



---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171175263
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
+import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class FileCacheReadsFromBlobTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. 
Ihr Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich 
umher der Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der 
Felsen auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   @Mock
+   private PermanentBlobService blobService;
+
+   @Before
+   public void setup() throws IOException {
+   try {
+   String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
+   fileCache = new FileCache(tmpDirectories, blobService);
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("Cannot create FileCache: " + e.getMessage());
+   }
+   }
+
+   @After
+   public void shutdown() {
+   try {
+   fileCache.shutdown();
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("FileCache shutdown failed: " + e.getMessage());
+   }
+   }
+
+   @Te

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171175037
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
+import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class FileCacheReadsFromBlobTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. 
Ihr Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich 
umher der Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der 
Felsen auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   @Mock
+   private PermanentBlobService blobService;
+
+   @Before
+   public void setup() throws IOException {
+   try {
+   String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
+   fileCache = new FileCache(tmpDirectories, blobService);
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("Cannot create FileCache: " + e.getMessage());
+   }
+   }
+
+   @After
+   public void shutdown() {
+   try {
+   fileCache.shutdown();
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("FileCache shutdown failed: " + e.getMessage());
+   }
+   }
+
+   @Te

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171174660
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
+import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class FileCacheReadsFromBlobTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. 
Ihr Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich 
umher der Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der 
Felsen auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   @Mock
+   private PermanentBlobService blobService;
+
+   @Before
+   public void setup() throws IOException {
+   try {
+   String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
+   fileCache = new FileCache(tmpDirectories, blobService);
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("Cannot create FileCache: " + e.getMessage());
+   }
+   }
+
+   @After
+   public void shutdown() {
+   try {
+   fileCache.shutdown();
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("FileCache shutdown failed: " + e.getMessage());
+   }
+   }
+
+   @Te

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171174187
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, 
String name) {
 * @param executable flag indicating whether the file should be 
executable
 */
public void registerCachedFile(String filePath, String name, boolean 
executable) {
-   this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
+   registerCachedFile(filePath, name, executable, false);
}
+
+   /**
+* Registers a file at the distributed cache under the given name. The 
file will be accessible
+* from any user-defined function in the (distributed) runtime under a 
local path. If upload is true files will
+* be distributed via {@link BlobServer} otherwise Files should be 
local files (as long as all relevant workers
+* have access to it), or files in a distributed file system. The 
runtime will copy the files temporarily to a
+* local cache, if needed.
+*
+* The {@link org.apache.flink.api.common.functions.RuntimeContext} 
can be obtained inside UDFs via
+* {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
+* {@link org.apache.flink.api.common.cache.DistributedCache} via
+* {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+*
+* @param filePath The path of the file
+* @param name The name under which the file is registered.
+* @param executable flag indicating whether the file should be 
executable
+* @param upload flag indicating if the file should be distributed via 
BlobServer
+*/
+   public void registerCachedFile(String filePath, String name, boolean 
executable, boolean upload) {
--- End diff --

I'm wondering whether we really need a new method. We could just modify the 
behavior of the existing method which should be transparent to the user.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-28 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171172025
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ---
@@ -106,9 +106,9 @@
  * are created for the plan nodes, on the way back up, the nodes connect 
their predecessors.
  */
 public class JobGraphGenerator implements Visitor {
-   
+
public static final String MERGE_ITERATION_AUX_TASKS_KEY = 
"compiler.merge-iteration-aux";
-   
--- End diff --

Sorry for that. Forgot to disable stripping whitespaces on save. Will 
revert those changes.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-28 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171171252
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ---
@@ -106,9 +106,9 @@
  * are created for the plan nodes, on the way back up, the nodes connect 
their predecessors.
  */
 public class JobGraphGenerator implements Visitor {
-   
+
public static final String MERGE_ITERATION_AUX_TASKS_KEY = 
"compiler.merge-iteration-aux";
-   
--- End diff --

this would be a lot easier to review without the formatting changes :/


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-26 Thread ifndef-SleePy
Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r170804963
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -314,6 +315,9 @@ public static TaskExecutor startTaskManager(
 
TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
+   final FileCache fileCache = new 
FileCache(taskManagerServicesConfiguration.getTmpDirPaths(),
--- End diff --

It seems that the `fileCache` here is not been used at all.


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-02-26 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/5580

[FLINK-8620] Enable shipping custom files to BlobStore and accessing …

…them through DistributedCache


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink distributed-cache

Alternatively you can