[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463522#comment-16463522
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5580


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461254#comment-16461254
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
I've addressed @zentol last comments and rebased to fix conflicts. Will 
merge after travis gives green.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-05-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461101#comment-16461101
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r185517079
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449720#comment-16449720
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r183703858
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449721#comment-16449721
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r183702885
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449719#comment-16449719
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r183700946
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -488,13 +494,36 @@ public void addJar(Path jar) {
return userJars;
}
 
+   /**
+* Adds the path of a custom file required to run the job on a task 
manager.
+*
+* @param file
--- End diff --

missing parameter docs for `name`


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425439#comment-16425439
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179123350
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -164,16 +165,13 @@ public void testDirectoryCleanUp() throws Exception {
assertTrue(fs.exists(cacheFile));
 
fileCache.releaseJob(jobID, attemptID2);
-   // still should be available, file will be deleted after 5 
seconds
+   // still should be available, file will be deleted after 200 
milliseconds
assertTrue(fileStatus.isDir());
assertTrue(fs.exists(cacheFile));
--- End diff --

what you _could_ also do is pass in a `ScheduledExecutorService`. You could 
then intercept the delete process, verify the interval arguments and fire the 
process at your leisure.

I'm worried that in it's current form the test is either not stable or 
takes longer than it should.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425398#comment-16425398
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179116502
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
);
}
 
+   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+   checkNotNull(jobId);
+
+   synchronized (lock) {
+   Set jobRefCounter = 
jobRefHolders.get(jobId);
+
+   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
+   return;
+   }
+
+   jobRefCounter.remove(executionId);
--- End diff --

You are right! I added removing it in the DeleteProcess.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425395#comment-16425395
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179116344
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 
ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+   final ZipEntry zipEntry = new 
ZipEntry("cacheFile");
+   zis.putNextEntry(zipEntry);
   

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425276#comment-16425276
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179087145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

ah yes, i was misreading the test...
´


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425247#comment-16425247
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179079963
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

it does, it is `FileCache` responsibility to unzip.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425233#comment-16425233
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179075409
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

as far the filecache is concerned it doesn't follow a different path 
though, right?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425217#comment-16425217
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179071066
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

It is different in the way that this test uploads a single file. Therefore 
it follows a bit different path. The file is not zipped and is directly shipped 
from BlobCache. In case of directories there is unzipped version which 
lifecycle is managed by `FileCache`.

If you think though the test 
`FileCacheDirectoryTest#testDirectoryDownloadedFromBlob` is enough, I will 
remove it.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425163#comment-16425163
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179049139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -231,6 +234,7 @@ public TaskExecutor(
 
hardwareDescription = HardwareDescription.extractFromSystem(

taskExecutorServices.getMemoryManager().getMemorySize());
+
--- End diff --

revert


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425178#comment-16425178
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179056758
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
);
}
 
+   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+   checkNotNull(jobId);
+
+   synchronized (lock) {
+   Set jobRefCounter = 
jobRefHolders.get(jobId);
+
+   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
+   return;
+   }
+
+   jobRefCounter.remove(executionId);
--- End diff --

shouldn't we also remove the entries? Otherwise the `entries` map will 
continue to grow until either the TM shuts down or the or crashes with an OOM 
error.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425168#comment-16425168
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179050544
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -331,7 +332,6 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
CompletableFuture submissionFuture = 
jobUploadFuture.thenCompose(
(JobGraph jobGraphToSubmit) -> {
log.info("Submitting job graph.");
-
--- End diff --

revert


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425177#comment-16425177
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179057166
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
);
}
 
+   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+   checkNotNull(jobId);
+
+   synchronized (lock) {
+   Set jobRefCounter = 
jobRefHolders.get(jobId);
+
+   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
+   return;
+   }
+
+   jobRefCounter.remove(executionId);
+   if (jobRefCounter.isEmpty()) {
+   executorService.schedule(new 
DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
+   }
+   }
+   }
+
// 

//  background processes
// 

 
/**
-* Asynchronous file copy process.
+* Asynchronous file copy process from blob server.
 */
-   private static class CopyProcess implements Callable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean isDirectory;
+   private final boolean isExecutable;
+   private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.isExecutable = e.isExecutable;
+   this.isDirectory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
+   final File file = blobService.getFile(jobID, blobKey);
+
+   if (isDirectory) {
+   try (ZipInputStream zis = new 
ZipInputStream(new FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != 
null) {
+   String fileName = 
entry.getName();
+   Path newFile = new Path(target, 
fileName);
+   if (entry.isDirectory()) {
+   
target.getFileSystem().mkdirs(newFile);
+   } else {
+   try (FSDataOutputStream 
fsDataOutputStream = target.getFileSystem()
+   
.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
+   
IOUtils.copyBytes(zis, fsDataOutputStream, false);
+   }
+ 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425176#comment-16425176
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179053003
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
+
+   final File tempFile = File.createTempFile("temp", null, new 
File(tempDir));
+   Files.write(tempFile.toPath(), singletonList(fileContent));
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.registerCachedFile(tempFile.getPath(), "test_data", false);
+
+   env.fromElements(1)
+   .map(new TestMapFunction(tempFile.getAbsolutePath()))
+   .writeAsText(params.getRequired("output"), 
FileSystem.WriteMode.OVERWRITE);
+
+   env.execute("Distributed Cache Via Blob Test Program");
+   }
+
+   static class TestMapFunction extends RichMapFunction {
+
+   private String initialPath;
+
+   public TestMapFunction(String initialPath) {
+   this.initialPath = initialPath;
+   }
+
+   @Override
+   public String map(Integer value) throws Exception {
+   Files.deleteIfExists(Paths.get(initialPath));
+   return StringUtils.join(Files
+   
.readAllLines(getRuntimeContext().getDistributedCache().getFile("test_data").toPath()),
 "\n");
--- End diff --

this could be replaced with
```
return 
Files.readAllLines(getRuntimeContext().getDistributedCache().getFile("test_data").toPath()).stream()
.collect(Collectors.joining("\n"));
```


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425166#comment-16425166
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179053531
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
+
+   final File tempFile = File.createTempFile("temp", null, new 
File(tempDir));
+   Files.write(tempFile.toPath(), singletonList(fileContent));
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
--- End diff --

explicitly set the parallelism to `1`?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425171#comment-16425171
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179060408
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from 
BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die 
drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren 
Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr 
Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der 
Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. 
Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen 
auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom 
Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings 
umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. 
Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   final File zipArchive = 
temporaryFolder.newFile("zipArchive");
+   try (ZipOutputStream zis = new 
ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+   final ZipEntry zipEntry = new 
ZipEntry("cacheFile");
+   zis.putNextEntry(zipEntry);

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425167#comment-16425167
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179050736
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
 ---
@@ -37,18 +39,31 @@
  */
 @Public
 public class DistributedCache {
-   
-   public static class DistributedCacheEntry {
-   
+
+   public static class DistributedCacheEntry implements Serializable {
+
public String filePath;
public Boolean isExecutable;
-   
-   public DistributedCacheEntry(String filePath, Boolean 
isExecutable){
+   public Boolean isZipped;
--- End diff --

use a primitive instead?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425170#comment-16425170
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179053801
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
+
+   final File tempFile = File.createTempFile("temp", null, new 
File(tempDir));
+   Files.write(tempFile.toPath(), singletonList(fileContent));
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.registerCachedFile(tempFile.getPath(), "test_data", false);
+
+   env.fromElements(1)
+   .map(new TestMapFunction(tempFile.getAbsolutePath()))
+   .writeAsText(params.getRequired("output"), 
FileSystem.WriteMode.OVERWRITE);
+
+   env.execute("Distributed Cache Via Blob Test Program");
+   }
+
+   static class TestMapFunction extends RichMapFunction {
+
+   private String initialPath;
+
+   public TestMapFunction(String initialPath) {
+   this.initialPath = initialPath;
+   }
+
+   @Override
+   public String map(Integer value) throws Exception {
+   Files.deleteIfExists(Paths.get(initialPath));
--- End diff --

this seems icky. could we not compare the paths instead to make sure we're 
accessing a different file?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425174#comment-16425174
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179056079
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
);
}
 
+   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+   checkNotNull(jobId);
+
+   synchronized (lock) {
+   Set jobRefCounter = 
jobRefHolders.get(jobId);
+
+   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
+   return;
+   }
+
+   jobRefCounter.remove(executionId);
+   if (jobRefCounter.isEmpty()) {
+   executorService.schedule(new 
DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
+   }
+   }
+   }
+
// 

//  background processes
// 

 
/**
-* Asynchronous file copy process.
+* Asynchronous file copy process from blob server.
 */
-   private static class CopyProcess implements Callable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean isDirectory;
+   private final boolean isExecutable;
+   private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.isExecutable = e.isExecutable;
+   this.isDirectory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
--- End diff --

the constructor signature should be modified instead


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425169#comment-16425169
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179052173
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml ---
@@ -0,0 +1,106 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   ..
+   
+
+   4.0.0
+
+   
flink-distributed-cache-via-blob-test_${scala.binary.version}
+   flink-distributed-cache-via-blob
+   jar
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   2.4
+
+   
+   
+   
+   ClassLoaderTestProgram
+   package
+   
+   jar
+   
+   
+   
DistributedCacheViaBlobTestProgram
+
+   
+   

+   
org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram
+   

+   
+
+   
+   
org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram*
+   
+   
+   
+   
+   
+
+   
+   
+   org.apache.maven.plugins
+   maven-antrun-plugin
--- End diff --

this can be removed if you add 
`DataSetAllroundTestProgram` to the `maven-jar-plugin` 
configuration, see the `flink-dataset-allround-test` module.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425172#comment-16425172
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179057582
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -631,14 +631,14 @@ else if (current == ExecutionState.CANCELING) {

DistributedCache.readFileInfoFromConfig(jobConfiguration))
{
LOG.info("Obtaining local cache file 
for '{}'.", entry.getKey());
-   Future cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
+   Future cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);

distributedCacheEntries.put(entry.getKey(), cp);
}
}
catch (Exception e) {
throw new Exception(
-   String.format("Exception while adding 
files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
--- End diff --

revert


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425175#comment-16425175
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179054280
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
+
+   final File tempFile = File.createTempFile("temp", null, new 
File(tempDir));
--- End diff --

use a random name instead, in case the tempDir isn't randomized.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425164#comment-16425164
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179048742
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
@@ -26,6 +26,7 @@
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.cache.DistributedCache;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
--- End diff --

this import should now be unnecessary. or the change should be reverted


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425173#comment-16425173
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179054489
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on 
accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool params = ParameterTool.fromArgs(args);
+
+   final String fileContent = params.getRequired("content");
+   final String tempDir = params.getRequired("tempDir");
--- End diff --

I'm wondering whether we shouldn't just pass in a file instead.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425165#comment-16425165
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179052578
  
--- Diff: 
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.commons.lang3.StringUtils;
--- End diff --

undeclared dependency.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-04-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425179#comment-16425179
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r179060824
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -33,16 +38,16 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+public class FileCacheReadsFromBlobTest {
--- End diff --

am i missing something, or isn't this now a duplicate of 
`FileCacheDirectoryTest#testDirectoryDownloadedFromBlob`


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415990#comment-16415990
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol I've rebased this PR. Feel free to review whenever you find a while.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411565#comment-16411565
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767248
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -91,6 +93,7 @@ public void after() throws Exception{
}
 
@Test
+   @Category(Flip6.class)
--- End diff --

revert?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411564#comment-16411564
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767271
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
 ---
@@ -40,6 +42,7 @@
 /**
  * Test the distributed cache.
  */
+@Category(Flip6.class)
--- End diff --

revert?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411567#comment-16411567
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176769129
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -143,30 +160,23 @@ public void shutdown() {
/**
 * If the file doesn't exists locally, it will copy the file to the 
temp directory.
 *
-* @param name  The name under which the file is registered.
 * @param entry The cache entry descriptor (path, executable flag)
 * @param jobID The ID of the job for which the file is copied.
 * @return The handle to the task that copies the file.
 */
-   public Future createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID) {
+   public Future createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID, ExecutionAttemptID executionId) {
synchronized (lock) {
-   Map> 
jobEntries = entries.get(jobID);
-   if (jobEntries == null) {
-   jobEntries = new HashMap>();
-   entries.put(jobID, jobEntries);
-   }
+   Map jobEntries = 
entries.computeIfAbsent(jobID, k -> new HashMap<>());
+   final Set refHolders = 
jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
+   refHolders.add(executionId);
 
// tuple is (ref-count, parent-temp-dir, 
cached-file-path, copy-process)
--- End diff --

outdated


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411568#comment-16411568
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767320
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1831,4 +1831,5 @@ public void registerCachedFile(String filePath, 
String name) {
public void registerCachedFile(String filePath, String name, boolean 
executable) {
this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
}
+
--- End diff --

revert


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411566#comment-16411566
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176768205
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

that's an implementation detail though


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411539#comment-16411539
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176766216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
  

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411528#comment-16411528
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176764920
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
  

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411452#comment-16411452
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
Thanks @zentol for a review. 

You are right not supporting it in old cluster mode would be a regression. 
As I've first discussed it, there were some doubts if it could work in old mode 
(some hypothetical problems with blobs  timeouting during submission). 
Therefore I started with the RestClusterClient, but as I've had a look today, I 
saw no problem with doing it also for old cluster mode.

I've also reverted the cleanup process for `FileCache`.

Please have a look at the updated PR, if you have time. I will also rebase 
it shortly.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411448#comment-16411448
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176747071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411445#comment-16411445
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176746645
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
+   String fileName = 
entry.getName().replaceAll("/", "");
--- End diff --

It is not sufficient. In this testcase `entry.getName` returns 
`/pre,,,suff`. Note the `/` in the beginning.

Though I've changed `replaceAll` to `replaceFirst`.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411435#comment-16411435
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176745753
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
 *  Any additional configuration for the blob client
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param jars
-*  List of JAR files to upload
+* @param files
+*  List of files to upload
 *
 * @throws IOException
 *  if the upload fails
 */
-   public static List uploadJarFiles(
-   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List jars)
+   public static List uploadFiles(
+   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List files)
throws IOException {
 
checkNotNull(jobId);
 
-   if (jars.isEmpty()) {
+   if (files.isEmpty()) {
return Collections.emptyList();
} else {
List blobKeys = new ArrayList<>();
 
try (BlobClient blobClient = new 
BlobClient(serverAddress, clientConfig)) {
-   for (final Path jar : jars) {
-   final FileSystem fs = 
jar.getFileSystem();
-   FSDataInputStream is = null;
-   try {
-   is = fs.open(jar);
-   final PermanentBlobKey key =
-   (PermanentBlobKey) 
blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
-   blobKeys.add(key);
-   } finally {
-   if (is != null) {
-   is.close();
-   }
-   }
+   for (final Path file : files) {
+   final PermanentBlobKey key = 
blobClient.uploadFile(jobId, file);
+   blobKeys.add(key);
}
}
 
return blobKeys;
}
}
 
-   // 

-   //  Miscellaneous
-   // 

-
-   private static Throwable readExceptionFromStream(InputStream in) throws 
IOException {
-   int len = readLength(in);
-   byte[] bytes = new byte[len];
-   readFully(in, bytes, 0, len, "Error message");
+   /**
+* Uploads a single file to the {@link PermanentBlobService} of the 
given {@link BlobServer}.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param file
+*  file to upload
+*
+* @throws IOException
+*  if the upload fails
+*/
+   public PermanentBlobKey uploadFile(JobID jobId, Path file) throws 
IOException {
+   final FileSystem fs = file.getFileSystem();
+   if (fs.getFileStatus(file).isDir()) {
+   return uploadDirectory(jobId, file, fs);
+   } else {
+   try (InputStream is = fs.open(file)) {
+   return (PermanentBlobKey) putInputStream(jobId, 
is, PERMANENT_BLOB);
+   }
+   }
+   }
 
-   try {
-   return (Throwable) 
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+   private PermanentBlobKey uploadDirectory(JobID jobId, Path file, 
FileSystem fs) throws IOException {
+   try (BlobOutputStream blobOutputStream = new 
BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+   try (ZipOutputStream zipStream = new 
ZipOutputStream(blobOutputStream)) {
+   compressDirectoryToZipfile(fs, 
fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+   zipStream.finish();
+ 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411432#comment-16411432
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176745183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

It does not need to. `getNextEntry` does it, at the very beginning.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410988#comment-16410988
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662946
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -19,30 +19,38 @@
 package org.apache.flink.runtime.filecache;
 
 import org.apache.flink.api.common.JobID;
-import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
 import org.apache.flink.shaded.guava18.com.google.common.io.Files;
 
+import org.apache.commons.lang3.StringUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
- * Test delete process of {@link FileCache}. The local cache file should 
not be deleted why another task comes in 5 seconds.
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
  */
-public class FileCacheDeleteValidationTest {
+@RunWith(MockitoJUnitRunner.class)
--- End diff --

Why is this needed now? I don't see any mocking being added.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410991#comment-16410991
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662757
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
--- End diff --

we have somewhere in a `FileUtils` a writeUtf8 method


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410981#comment-16410981
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176659407
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -159,7 +162,7 @@ public ExecutionConfig getConfig() {
* Get the list of cached files that were registered for distribution 
among the task managers.
*/
public List> 
getCachedFiles() {
-   return cacheFile;
+   return cacheFile.entrySet().stream().map(e -> 
Tuple2.of(e.getKey(), e.getValue())).collect(Collectors.toList());
--- End diff --

if the mapping is never used there's no value in using a map in the first 
place.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410979#comment-16410979
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176658024
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -306,18 +311,20 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
CompletableFuture jobUploadFuture = 
portFuture.thenCombine(
getDispatcherAddress(),
(BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-   log.info("Uploading jar files.");
final int blobServerPort = response.port;
final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
final List keys;
try {
-   keys = 
BlobClient.uploadJarFiles(address, flinkConfig, jobGraph.getJobID(), 
jobGraph.getUserJars());
+   log.info("Uploading jar files.");
+   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
+   log.info("Uploading custom files.");
--- End diff --

don't think this should be logged if there's nothing to upload (which 
generally will be the default)


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410984#comment-16410984
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662188
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
--- End diff --

who's responsible for cleaning up the created local files?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410983#comment-16410983
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176659832
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -488,13 +493,36 @@ public void addJar(Path jar) {
return userJars;
}
 
+   /**
+* Adds the path of a JAR file required to run the job on a task 
manager.
--- End diff --

it may not be a jar file


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410990#comment-16410990
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176663553
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -57,89 +65,59 @@
+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
-   + "deine Boten, Herr, verehren Das sanfte Wandeln deines 
Tags.\n";
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
 
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
private FileCache fileCache;
-   private File f;
 
-   @Before
-   public void setup() throws IOException {
-   String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
-   try {
-   fileCache = new FileCache(tmpDirectories);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("Cannot create FileCache: " + e.getMessage());
+   private final PermanentBlobKey permanentBlobKey = new 
PermanentBlobKey();
+
+   private final PermanentBlobService blobService = new 
PermanentBlobService() {
+   @Override
+   public File getFile(JobID jobId, PermanentBlobKey key) throws 
IOException {
+   if (key.equals(permanentBlobKey)) {
+   File f = temporaryFolder.newFile("cacheFile");
+   FileUtils.writeFileUtf8(f, testFileContent);
+   return f;
+   } else {
+   throw new IllegalArgumentException("This 
service contains only entry for " + permanentBlobKey);
+   }
}
 
-   f = temporaryFolder.newFile("cacheFile");
-   try {
-   Files.write(testFileContent, f, Charsets.UTF_8);
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("Error initializing the test: " + e.getMessage());
+   @Override
+   public void close() throws IOException {
+
}
+   };
+
+   @Before
+   public void setup() throws Exception {
+   fileCache = new FileCache(new 
String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
}
 
@After
public void shutdown() {
-   try {
-   fileCache.shutdown();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail("FileCache shutdown failed: " + e.getMessage());
-   }
+   fileCache.shutdown();
}
 
@Test
-   public void testFileReuseForNextTask() {
-   try {
-   final JobID jobID = new JobID();
-   final String fileName = "test_file";
-
-   final String filePath = f.toURI().toString();
-
-   // copy / create the file
-   Future copyResult = 
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), 
jobID);
-   copyResult.get();
-
-   // get another reference to the file
-   Future copyResult2 = 
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), 
jobID);
-
-   // this should be available immediately
-   assertTrue(copyResult2.isDone());
-
-   // delete the file
-   fileCache.deleteTmpFile(fileName, jobID);
-   // file should not yet be deleted
-   assertTrue(fileCache.holdsStillReference(fileName, 
jobID));
-
-   // delete the second reference
-   fileCache.deleteTmpFile(fileName, jobID);
-   // file should still not be deleted, but remain for a 
bit
-   assertTrue(fileCache.holdsStillReference(fileName, 
jobID));
-
-   fileCache.createTmpFile(fileName, new 
DistributedCacheEntry(filePath, false), jobID);
-   fileCache.deleteTmpFile(fileName, jobID);
-
-   // after a while, the file should 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410986#comment-16410986
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176661729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
  

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410992#comment-16410992
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662599
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

entry should be closed


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410978#comment-16410978
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176658516
  
--- Diff: flink-end-to-end-tests/pom.xml ---
@@ -78,6 +78,27 @@ under the License.



+   
--- End diff --

this will need a rebase; each tests now has its own module


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410987#comment-16410987
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176662637
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
+   String fileName = 
entry.getName().replaceAll("/", "");
--- End diff --

`Path#getName()`?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410980#comment-16410980
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176658879
  
--- Diff: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
 ---
@@ -75,15 +66,12 @@
  */
 @PublicEvolving
 public class PythonStreamExecutionEnvironment {
-   private static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
--- End diff --

I would leave it even if it isn't used


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410985#comment-16410985
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176660974
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
 *  Any additional configuration for the blob client
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param jars
-*  List of JAR files to upload
+* @param files
+*  List of files to upload
 *
 * @throws IOException
 *  if the upload fails
 */
-   public static List uploadJarFiles(
-   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List jars)
+   public static List uploadFiles(
+   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List files)
throws IOException {
 
checkNotNull(jobId);
 
-   if (jars.isEmpty()) {
+   if (files.isEmpty()) {
return Collections.emptyList();
} else {
List blobKeys = new ArrayList<>();
 
try (BlobClient blobClient = new 
BlobClient(serverAddress, clientConfig)) {
-   for (final Path jar : jars) {
-   final FileSystem fs = 
jar.getFileSystem();
-   FSDataInputStream is = null;
-   try {
-   is = fs.open(jar);
-   final PermanentBlobKey key =
-   (PermanentBlobKey) 
blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
-   blobKeys.add(key);
-   } finally {
-   if (is != null) {
-   is.close();
-   }
-   }
+   for (final Path file : files) {
+   final PermanentBlobKey key = 
blobClient.uploadFile(jobId, file);
+   blobKeys.add(key);
}
}
 
return blobKeys;
}
}
 
-   // 

-   //  Miscellaneous
-   // 

-
-   private static Throwable readExceptionFromStream(InputStream in) throws 
IOException {
-   int len = readLength(in);
-   byte[] bytes = new byte[len];
-   readFully(in, bytes, 0, len, "Error message");
+   /**
+* Uploads a single file to the {@link PermanentBlobService} of the 
given {@link BlobServer}.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param file
+*  file to upload
+*
+* @throws IOException
+*  if the upload fails
+*/
+   public PermanentBlobKey uploadFile(JobID jobId, Path file) throws 
IOException {
+   final FileSystem fs = file.getFileSystem();
+   if (fs.getFileStatus(file).isDir()) {
+   return uploadDirectory(jobId, file, fs);
+   } else {
+   try (InputStream is = fs.open(file)) {
+   return (PermanentBlobKey) putInputStream(jobId, 
is, PERMANENT_BLOB);
+   }
+   }
+   }
 
-   try {
-   return (Throwable) 
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+   private PermanentBlobKey uploadDirectory(JobID jobId, Path file, 
FileSystem fs) throws IOException {
+   try (BlobOutputStream blobOutputStream = new 
BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+   try (ZipOutputStream zipStream = new 
ZipOutputStream(blobOutputStream)) {
+   compressDirectoryToZipfile(fs, 
fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+   zipStream.finish();
+   

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410982#comment-16410982
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176661496
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
--- End diff --

-> `isDirectory`


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410989#comment-16410989
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176658632
  
--- Diff: flink-end-to-end-tests/pom.xml ---
@@ -78,6 +78,27 @@ under the License.



+   
+   
DistributedCacheViaBlobTestProgram
+   package
+   
+   jar
+   
+   
+   
DistributedCacheViaBlobTestProgram
+
+   
+   

+   
org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram
+   

+   
+
+   
+   
org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.class
--- End diff --

This can use wildcards, no? 
`org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram* `


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410933#comment-16410933
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176657746
  
--- Diff: 
flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
 ---
@@ -30,6 +33,7 @@
 /**
  * Tests for the PythonPlanBinder.
  */
+@Category(Flip6.class)
--- End diff --

why does this not run anymore against legacy clusters?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410665#comment-16410665
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol What do you think we still need now? I think this makes things 
easier for users. And we also need this for work on the Beam Flink Runner.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16408258#comment-16408258
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5580
  
I think that's a very good solution in the end that even simplifies things. 
What do you think, @zentol ?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407980#comment-16407980
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
@aljoscha In the end I've implemented uploading directories as zip 
archives, as @zentol suggested. End-to-end python tests passes. Let me know 
what you both think.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406935#comment-16406935
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5580
  
Ok, to unblock this, I would suggest to add the new functionality under a 
different name, support only adding a single file (and also verify that the 
path passed in is a single file). @dawidwys would that be straightforward to do?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406073#comment-16406073
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
I am not suggesting dropping uploading directories, but rather gradually 
switch to blob server. Leave the possibility to distribute files via DFS (and 
leave Python using it) and only for 1.6.0 drop it completely.

But of course I am open to suggestions.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406055#comment-16406055
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5580
  
Not supporting directories would be a regression regardless of whether the 
Python API needs it or not.

We either have to add a new method with different behavior, or zip 
directories underneath before uploading them.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405958#comment-16405958
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
@aljoscha Yes that's right. The latest commit does not work because Python 
API expects complete directories to be uploaded. I think it is quite important 
cause this way the uploaded Python code can be modularized and python libs can 
be used. Without the directories only a single file scripts could be executed. 
If I understood it correctly.

Version from the commit a774fb664deabb6fdb437546038fc9477d291eb1 works, but 
Python API still uses DFS underneath DistributedCache.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405778#comment-16405778
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5580
  
So the current state of this PR doesn't work with the Python API because it 
uploads complete directories? Is that really used/needed by the Python API?

I would like to have this feature in 1.5, if possible. This PR also adds an 
end-to-end test, which should make it very robust against future changes.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380439#comment-16380439
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol I've tried changing the behaviour of `registerCachedFile` to always 
distribute files via BlobServer, but I've stumbled across problem with python 
API. Right now whole directories can be distributed via DFS. Unfortunately we 
can upload only single files under a single key to BlobServer. I wonder what 
should be better solution for it:

1. Extend BlobClient to enable uploading whole directories under a single 
key
2. Upload python directories as plain files (with apropriately adjusted 
keys e.g. path/to/filename) and later on restore structure on access

What do you think?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380036#comment-16380036
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171184258
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, 
String name) {
 * @param executable flag indicating whether the file should be 
executable
 */
public void registerCachedFile(String filePath, String name, boolean 
executable) {
-   this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
+   registerCachedFile(filePath, name, executable, false);
}
+
+   /**
+* Registers a file at the distributed cache under the given name. The 
file will be accessible
+* from any user-defined function in the (distributed) runtime under a 
local path. If upload is true files will
+* be distributed via {@link BlobServer} otherwise Files should be 
local files (as long as all relevant workers
+* have access to it), or files in a distributed file system. The 
runtime will copy the files temporarily to a
+* local cache, if needed.
+*
+* The {@link org.apache.flink.api.common.functions.RuntimeContext} 
can be obtained inside UDFs via
+* {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
+* {@link org.apache.flink.api.common.cache.DistributedCache} via
+* {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+*
+* @param filePath The path of the file
+* @param name The name under which the file is registered.
+* @param executable flag indicating whether the file should be 
executable
+* @param upload flag indicating if the file should be distributed via 
BlobServer
+*/
+   public void registerCachedFile(String filePath, String name, boolean 
executable, boolean upload) {
--- End diff --

send everything through the blob service


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380017#comment-16380017
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171181348
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, 
String name) {
 * @param executable flag indicating whether the file should be 
executable
 */
public void registerCachedFile(String filePath, String name, boolean 
executable) {
-   this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
+   registerCachedFile(filePath, name, executable, false);
}
+
+   /**
+* Registers a file at the distributed cache under the given name. The 
file will be accessible
+* from any user-defined function in the (distributed) runtime under a 
local path. If upload is true files will
+* be distributed via {@link BlobServer} otherwise Files should be 
local files (as long as all relevant workers
+* have access to it), or files in a distributed file system. The 
runtime will copy the files temporarily to a
+* local cache, if needed.
+*
+* The {@link org.apache.flink.api.common.functions.RuntimeContext} 
can be obtained inside UDFs via
+* {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
+* {@link org.apache.flink.api.common.cache.DistributedCache} via
+* {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+*
+* @param filePath The path of the file
+* @param name The name under which the file is registered.
+* @param executable flag indicating whether the file should be 
executable
+* @param upload flag indicating if the file should be distributed via 
BlobServer
+*/
+   public void registerCachedFile(String filePath, String name, boolean 
executable, boolean upload) {
--- End diff --

What behaviour do you suggest sending all local files through Blob and 
serving files from DFS as before?



> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379969#comment-16379969
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171175263
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
+import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class FileCacheReadsFromBlobTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. 
Ihr Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich 
umher der Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der 
Felsen auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   @Mock
+   private PermanentBlobService blobService;
+
+   @Before
+   public void setup() throws IOException {
+   try {
+   String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
+   fileCache = new FileCache(tmpDirectories, blobService);
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("Cannot create FileCache: " + e.getMessage());
+   }
+   }
+
+   @After
+   public void shutdown() {
+   try {
+  

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379968#comment-16379968
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171175037
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
+import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class FileCacheReadsFromBlobTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. 
Ihr Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich 
umher der Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der 
Felsen auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   @Mock
+   private PermanentBlobService blobService;
+
+   @Before
+   public void setup() throws IOException {
+   try {
+   String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
+   fileCache = new FileCache(tmpDirectories, blobService);
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("Cannot create FileCache: " + e.getMessage());
+   }
+   }
+
+   @After
+   public void shutdown() {
+   try {
+  

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379965#comment-16379965
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171174660
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Charsets;
+import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that {@link FileCache} can read files from {@link BlobServer}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class FileCacheReadsFromBlobTest {
+
+   private static final String testFileContent = "Goethe - Faust: Der 
Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+   + "Der Herr. Die himmlischen Heerscharen. Nachher 
Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+   + "RAPHAEL: Die Sonne toent, nach alter Weise, In 
Brudersphaeren Wettgesang,\n"
+   + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. 
Ihr Anblick\n"
+   + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die 
unbegreiflich\n"
+   + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+   + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich 
umher der Erde\n"
+   + "Pracht; Es wechselt Paradieseshelle Mit tiefer, 
schauervoller Nacht. Es\n"
+   + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der 
Felsen auf, Und\n"
+   + "Fels und Meer wird fortgerissen Im ewig schnellem 
Sphaerenlauf.\n"
+   + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs 
Land, vom Land\n"
+   + "aufs Meer, und bilden wuetend eine Kette Der tiefsten 
Wirkung rings umher.\n"
+   + "Da flammt ein blitzendes Verheeren Dem Pfade vor des 
Donnerschlags. Doch\n"
+   + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private FileCache fileCache;
+
+   @Mock
+   private PermanentBlobService blobService;
+
+   @Before
+   public void setup() throws IOException {
+   try {
+   String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
+   fileCache = new FileCache(tmpDirectories, blobService);
+   } catch (Exception e) {
+   e.printStackTrace();
+   fail("Cannot create FileCache: " + e.getMessage());
+   }
+   }
+
+   @After
+   public void shutdown() {
+   try {
+  

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379959#comment-16379959
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171174187
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, 
String name) {
 * @param executable flag indicating whether the file should be 
executable
 */
public void registerCachedFile(String filePath, String name, boolean 
executable) {
-   this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
+   registerCachedFile(filePath, name, executable, false);
}
+
+   /**
+* Registers a file at the distributed cache under the given name. The 
file will be accessible
+* from any user-defined function in the (distributed) runtime under a 
local path. If upload is true files will
+* be distributed via {@link BlobServer} otherwise Files should be 
local files (as long as all relevant workers
+* have access to it), or files in a distributed file system. The 
runtime will copy the files temporarily to a
+* local cache, if needed.
+*
+* The {@link org.apache.flink.api.common.functions.RuntimeContext} 
can be obtained inside UDFs via
+* {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
+* {@link org.apache.flink.api.common.cache.DistributedCache} via
+* {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
+*
+* @param filePath The path of the file
+* @param name The name under which the file is registered.
+* @param executable flag indicating whether the file should be 
executable
+* @param upload flag indicating if the file should be distributed via 
BlobServer
+*/
+   public void registerCachedFile(String filePath, String name, boolean 
executable, boolean upload) {
--- End diff --

I'm wondering whether we really need a new method. We could just modify the 
behavior of the existing method which should be transparent to the user.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379942#comment-16379942
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171172025
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ---
@@ -106,9 +106,9 @@
  * are created for the plan nodes, on the way back up, the nodes connect 
their predecessors.
  */
 public class JobGraphGenerator implements Visitor {
-   
+
public static final String MERGE_ITERATION_AUX_TASKS_KEY = 
"compiler.merge-iteration-aux";
-   
--- End diff --

Sorry for that. Forgot to disable stripping whitespaces on save. Will 
revert those changes.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379938#comment-16379938
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r171171252
  
--- Diff: 
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 ---
@@ -106,9 +106,9 @@
  * are created for the plan nodes, on the way back up, the nodes connect 
their predecessors.
  */
 public class JobGraphGenerator implements Visitor {
-   
+
public static final String MERGE_ITERATION_AUX_TASKS_KEY = 
"compiler.merge-iteration-aux";
-   
--- End diff --

this would be a lot easier to review without the formatting changes :/


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379673#comment-16379673
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol All right, got your point. That's a problem indeed. 


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378659#comment-16378659
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
@zentol I agree it will need to be changed in the end (though wouldn't say 
rewritten completely) after the submission process is completed. 

I think though this change is important from the perspective of integration 
with portability layer in beam. Would be glad to hear @aljoscha's opinion on 
that.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378576#comment-16378576
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5580
  
I'm aware that the `RestClusterClient` currently submits jars directly to 
the blob service. If this PR would've been opened half a year ago I would've 
welcomed it with open arms.

My point still stands though, every line that is touched here would have to 
be rewritten once we do that, which may very well happen before 1.6.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378535#comment-16378535
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5580
  
If some day the behavior of `RestClusterClient` changes. Maybe uploading 
the jars via REST API. We could just upload the user-defined files via REST API 
either. That's not a big problem, right?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378526#comment-16378526
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user ifndef-SleePy commented on the issue:

https://github.com/apache/flink/pull/5580
  
Hi @zentol 

In Flip-6 the client communicates with cluster via REST API, that's true. 
However this not include blobs. Currently in `RestClusterClient.submit()` 
method, the client uses blob service to upload jars of user. So this PR does 
not break the rule.

I think this feature is quite good. Spark, even MapReduce support uploading 
user-defined jars/files/archives. I think we should support it. Users would be 
able to migrate to Flink more fluently with this feature.

What do you think?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378497#comment-16378497
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5580
  
As far as i know one of the end-goals of FLIP-6 is to have all 
client-cluster communication go through the REST API. This implies that the 
client cannot submit things directly to the blob service.

I'm wondering whether it really makes sense to change the current behavior 
as any change made would effectively be deprecated-on-arrival.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377993#comment-16377993
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user ifndef-SleePy commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r170804963
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -314,6 +315,9 @@ public static TaskExecutor startTaskManager(
 
TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
+   final FileCache fileCache = new 
FileCache(taskManagerServicesConfiguration.getTmpDirPaths(),
--- End diff --

It seems that the `fileCache` here is not been used at all.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376898#comment-16376898
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
R: @aljoscha 


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376893#comment-16376893
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/5580

[FLINK-8620] Enable shipping custom files to BlobStore and accessing …

…them through DistributedCache


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is