[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5580 ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r185517079 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.filecache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after. + */ +public class FileCacheDirectoriesTest { + + private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileCache fileCache; + + private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey(); + + private final PermanentBlobService blobService = new PermanentBlobService() { + @Override + public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { + if (key.equals(permanentBlobKey)) { + final File zipArchive = temporaryFolder.newFile("zipArchive"); + try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) { + + final ZipEntry zipEntry = new ZipEntry("cacheFile"); + zis.putNextEntry(zipEntry); + +
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r183700946 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -488,13 +494,36 @@ public void addJar(Path jar) { return userJars; } + /** +* Adds the path of a custom file required to run the job on a task manager. +* +* @param file --- End diff -- missing parameter docs for `name` ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r183703858 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.filecache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after. + */ +public class FileCacheDirectoriesTest { + + private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileCache fileCache; + + private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey(); + + private final PermanentBlobService blobService = new PermanentBlobService() { + @Override + public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { + if (key.equals(permanentBlobKey)) { + final File zipArchive = temporaryFolder.newFile("zipArchive"); + try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) { + + final ZipEntry zipEntry = new ZipEntry("cacheFile"); + zis.putNextEntry(zipEntry); + +
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r183702885 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java --- @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.filecache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after. + */ +public class FileCacheDirectoriesTest { + + private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileCache fileCache; + + private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey(); + + private final PermanentBlobService blobService = new PermanentBlobService() { + @Override + public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { + if (key.equals(permanentBlobKey)) { + final File zipArchive = temporaryFolder.newFile("zipArchive"); + try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) { + + final ZipEntry zipEntry = new ZipEntry("cacheFile"); + zis.putNextEntry(zipEntry); + +
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179123350 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java --- @@ -164,16 +165,13 @@ public void testDirectoryCleanUp() throws Exception { assertTrue(fs.exists(cacheFile)); fileCache.releaseJob(jobID, attemptID2); - // still should be available, file will be deleted after 5 seconds + // still should be available, file will be deleted after 200 milliseconds assertTrue(fileStatus.isDir()); assertTrue(fs.exists(cacheFile)); --- End diff -- what you _could_ also do is pass in a `ScheduledExecutorService`. You could then intercept the delete process, verify the interval arguments and fire the process at your leisure. I'm worried that in it's current form the test is either not stable or takes longer than it should. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179116502 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log ); } + public void releaseJob(JobID jobId, ExecutionAttemptID executionId) { + checkNotNull(jobId); + + synchronized (lock) { + Set jobRefCounter = jobRefHolders.get(jobId); + + if (jobRefCounter == null || jobRefCounter.isEmpty()) { + LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId); + return; + } + + jobRefCounter.remove(executionId); --- End diff -- You are right! I added removing it in the DeleteProcess. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179116344 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.filecache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after. + */ +public class FileCacheDirectoriesTest { + + private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileCache fileCache; + + private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey(); + + private final PermanentBlobService blobService = new PermanentBlobService() { + @Override + public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { + if (key.equals(permanentBlobKey)) { + final File zipArchive = temporaryFolder.newFile("zipArchive"); + try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) { + + final ZipEntry zipEntry = new ZipEntry("cacheFile"); + zis.putNextEntry(zipEntry); + + IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false); + } + return zipArchive;
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179087145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -33,16 +38,16 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds. + * Tests that {@link FileCache} can read files from {@link BlobServer}. */ -public class FileCacheDeleteValidationTest { +public class FileCacheReadsFromBlobTest { --- End diff -- ah yes, i was misreading the test... ´ ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179079963 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -33,16 +38,16 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds. + * Tests that {@link FileCache} can read files from {@link BlobServer}. */ -public class FileCacheDeleteValidationTest { +public class FileCacheReadsFromBlobTest { --- End diff -- it does, it is `FileCache` responsibility to unzip. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179075409 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -33,16 +38,16 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds. + * Tests that {@link FileCache} can read files from {@link BlobServer}. */ -public class FileCacheDeleteValidationTest { +public class FileCacheReadsFromBlobTest { --- End diff -- as far the filecache is concerned it doesn't follow a different path though, right? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179071066 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -33,16 +38,16 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds. + * Tests that {@link FileCache} can read files from {@link BlobServer}. */ -public class FileCacheDeleteValidationTest { +public class FileCacheReadsFromBlobTest { --- End diff -- It is different in the way that this test uploads a single file. Therefore it follows a bit different path. The file is not zipped and is directly shipped from BlobCache. In case of directories there is unzipped version which lifecycle is managed by `FileCache`. If you think though the test `FileCacheDirectoryTest#testDirectoryDownloadedFromBlob` is enough, I will remove it. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179050736 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java --- @@ -37,18 +39,31 @@ */ @Public public class DistributedCache { - - public static class DistributedCacheEntry { - + + public static class DistributedCacheEntry implements Serializable { + public String filePath; public Boolean isExecutable; - - public DistributedCacheEntry(String filePath, Boolean isExecutable){ + public Boolean isZipped; --- End diff -- use a primitive instead? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179060408 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.filecache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after. + */ +public class FileCacheDirectoriesTest { + + private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileCache fileCache; + + private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey(); + + private final PermanentBlobService blobService = new PermanentBlobService() { + @Override + public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { + if (key.equals(permanentBlobKey)) { + final File zipArchive = temporaryFolder.newFile("zipArchive"); + try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) { + + final ZipEntry zipEntry = new ZipEntry("cacheFile"); + zis.putNextEntry(zipEntry); + + IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false); + } + return zipArchive;
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179056079 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log ); } + public void releaseJob(JobID jobId, ExecutionAttemptID executionId) { + checkNotNull(jobId); + + synchronized (lock) { + Set jobRefCounter = jobRefHolders.get(jobId); + + if (jobRefCounter == null || jobRefCounter.isEmpty()) { + LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId); + return; + } + + jobRefCounter.remove(executionId); + if (jobRefCounter.isEmpty()) { + executorService.schedule(new DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS); + } + } + } + // // background processes // /** -* Asynchronous file copy process. +* Asynchronous file copy process from blob server. */ - private static class CopyProcess implements Callable { + private static class CopyFromBlobProcess implements Callable { - private final Path filePath; - private final Path cachedPath; - private boolean executable; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean isDirectory; + private final boolean isExecutable; + private final JobID jobID; + private final PermanentBlobService blobService; - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; + CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) { + try { + this.isExecutable = e.isExecutable; + this.isDirectory = e.isZipped; + this.jobID = jobID; + this.blobService = blobService; + this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader()); + this.target = target; + } catch (Exception ex) { --- End diff -- the constructor signature should be modified instead ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179049139 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -231,6 +234,7 @@ public TaskExecutor( hardwareDescription = HardwareDescription.extractFromSystem( taskExecutorServices.getMemoryManager().getMemorySize()); + --- End diff -- revert ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179048742 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java --- @@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; --- End diff -- this import should now be unnecessary. or the change should be reverted ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179053003 --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static java.util.Collections.singletonList; + +/** + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read + * version read from cache, we delete the initial file. + */ +public class DistributedCacheViaBlobTestProgram { + + public static void main(String[] args) throws Exception { + + final ParameterTool params = ParameterTool.fromArgs(args); + + final String fileContent = params.getRequired("content"); + final String tempDir = params.getRequired("tempDir"); + + final File tempFile = File.createTempFile("temp", null, new File(tempDir)); + Files.write(tempFile.toPath(), singletonList(fileContent)); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.registerCachedFile(tempFile.getPath(), "test_data", false); + + env.fromElements(1) + .map(new TestMapFunction(tempFile.getAbsolutePath())) + .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE); + + env.execute("Distributed Cache Via Blob Test Program"); + } + + static class TestMapFunction extends RichMapFunction { + + private String initialPath; + + public TestMapFunction(String initialPath) { + this.initialPath = initialPath; + } + + @Override + public String map(Integer value) throws Exception { + Files.deleteIfExists(Paths.get(initialPath)); + return StringUtils.join(Files + .readAllLines(getRuntimeContext().getDistributedCache().getFile("test_data").toPath()), "\n"); --- End diff -- this could be replaced with ``` return Files.readAllLines(getRuntimeContext().getDistributedCache().getFile("test_data").toPath()).stream() .collect(Collectors.joining("\n")); ``` ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179057582 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -631,14 +631,14 @@ else if (current == ExecutionState.CANCELING) { DistributedCache.readFileInfoFromConfig(jobConfiguration)) { LOG.info("Obtaining local cache file for '{}'.", entry.getKey()); - Future cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId); + Future cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId); distributedCacheEntries.put(entry.getKey(), cp); } } catch (Exception e) { throw new Exception( - String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId), --- End diff -- revert ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179053531 --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static java.util.Collections.singletonList; + +/** + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read + * version read from cache, we delete the initial file. + */ +public class DistributedCacheViaBlobTestProgram { + + public static void main(String[] args) throws Exception { + + final ParameterTool params = ParameterTool.fromArgs(args); + + final String fileContent = params.getRequired("content"); + final String tempDir = params.getRequired("tempDir"); + + final File tempFile = File.createTempFile("temp", null, new File(tempDir)); + Files.write(tempFile.toPath(), singletonList(fileContent)); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); --- End diff -- explicitly set the parallelism to `1`? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179056758 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log ); } + public void releaseJob(JobID jobId, ExecutionAttemptID executionId) { + checkNotNull(jobId); + + synchronized (lock) { + Set jobRefCounter = jobRefHolders.get(jobId); + + if (jobRefCounter == null || jobRefCounter.isEmpty()) { + LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId); + return; + } + + jobRefCounter.remove(executionId); --- End diff -- shouldn't we also remove the entries? Otherwise the `entries` map will continue to grow until either the TM shuts down or the or crashes with an OOM error. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179054280 --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static java.util.Collections.singletonList; + +/** + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read + * version read from cache, we delete the initial file. + */ +public class DistributedCacheViaBlobTestProgram { + + public static void main(String[] args) throws Exception { + + final ParameterTool params = ParameterTool.fromArgs(args); + + final String fileContent = params.getRequired("content"); + final String tempDir = params.getRequired("tempDir"); + + final File tempFile = File.createTempFile("temp", null, new File(tempDir)); --- End diff -- use a random name instead, in case the tempDir isn't randomized. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179052173 --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml --- @@ -0,0 +1,106 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + .. + + + 4.0.0 + + flink-distributed-cache-via-blob-test_${scala.binary.version} + flink-distributed-cache-via-blob + jar + + + + org.apache.flink + flink-core + ${project.version} + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + + ClassLoaderTestProgram + package + + jar + + + DistributedCacheViaBlobTestProgram + + + + org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram + + + + + org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram* + + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin --- End diff -- this can be removed if you add `DataSetAllroundTestProgram` to the `maven-jar-plugin` configuration, see the `flink-dataset-allround-test` module. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179050544 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -331,7 +332,6 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) CompletableFuture submissionFuture = jobUploadFuture.thenCompose( (JobGraph jobGraphToSubmit) -> { log.info("Submitting job graph."); - --- End diff -- revert ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179054489 --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static java.util.Collections.singletonList; + +/** + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read + * version read from cache, we delete the initial file. + */ +public class DistributedCacheViaBlobTestProgram { + + public static void main(String[] args) throws Exception { + + final ParameterTool params = ParameterTool.fromArgs(args); + + final String fileContent = params.getRequired("content"); + final String tempDir = params.getRequired("tempDir"); --- End diff -- I'm wondering whether we shouldn't just pass in a file instead. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179053801 --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static java.util.Collections.singletonList; + +/** + * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through + * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read + * version read from cache, we delete the initial file. + */ +public class DistributedCacheViaBlobTestProgram { + + public static void main(String[] args) throws Exception { + + final ParameterTool params = ParameterTool.fromArgs(args); + + final String fileContent = params.getRequired("content"); + final String tempDir = params.getRequired("tempDir"); + + final File tempFile = File.createTempFile("temp", null, new File(tempDir)); + Files.write(tempFile.toPath(), singletonList(fileContent)); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.registerCachedFile(tempFile.getPath(), "test_data", false); + + env.fromElements(1) + .map(new TestMapFunction(tempFile.getAbsolutePath())) + .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE); + + env.execute("Distributed Cache Via Blob Test Program"); + } + + static class TestMapFunction extends RichMapFunction { + + private String initialPath; + + public TestMapFunction(String initialPath) { + this.initialPath = initialPath; + } + + @Override + public String map(Integer value) throws Exception { + Files.deleteIfExists(Paths.get(initialPath)); --- End diff -- this seems icky. could we not compare the paths instead to make sure we're accessing a different file? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179060824 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -33,16 +38,16 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds. + * Tests that {@link FileCache} can read files from {@link BlobServer}. */ -public class FileCacheDeleteValidationTest { +public class FileCacheReadsFromBlobTest { --- End diff -- am i missing something, or isn't this now a duplicate of `FileCacheDirectoryTest#testDirectoryDownloadedFromBlob` ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179057166 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log ); } + public void releaseJob(JobID jobId, ExecutionAttemptID executionId) { + checkNotNull(jobId); + + synchronized (lock) { + Set jobRefCounter = jobRefHolders.get(jobId); + + if (jobRefCounter == null || jobRefCounter.isEmpty()) { + LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId); + return; + } + + jobRefCounter.remove(executionId); + if (jobRefCounter.isEmpty()) { + executorService.schedule(new DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS); + } + } + } + // // background processes // /** -* Asynchronous file copy process. +* Asynchronous file copy process from blob server. */ - private static class CopyProcess implements Callable { + private static class CopyFromBlobProcess implements Callable { - private final Path filePath; - private final Path cachedPath; - private boolean executable; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean isDirectory; + private final boolean isExecutable; + private final JobID jobID; + private final PermanentBlobService blobService; - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; + CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) { + try { + this.isExecutable = e.isExecutable; + this.isDirectory = e.isZipped; + this.jobID = jobID; + this.blobService = blobService; + this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader()); + this.target = target; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; + final File file = blobService.getFile(jobID, blobKey); + + if (isDirectory) { + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String fileName = entry.getName(); + Path newFile = new Path(target, fileName); + if (entry.isDirectory()) { + target.getFileSystem().mkdirs(newFile); + } else { + try (FSDataOutputStream fsDataOutputStream = target.getFileSystem() + .create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) { + IOUtils.copyBytes(zis, fsDataOutputStream, false); + } + //noinspection ResultOfMethodCallIgnored + new File(newFile.getPath()).setExecutable(isExecutable); + } +
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179052578 --- Diff: flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; --- End diff -- undeclared dependency. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176767320 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -1831,4 +1831,5 @@ public void registerCachedFile(String filePath, String name) { public void registerCachedFile(String filePath, String name, boolean executable) { this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable))); } + --- End diff -- revert ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176767248 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java --- @@ -91,6 +93,7 @@ public void after() throws Exception{ } @Test + @Category(Flip6.class) --- End diff -- revert? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176769129 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -143,30 +160,23 @@ public void shutdown() { /** * If the file doesn't exists locally, it will copy the file to the temp directory. * -* @param name The name under which the file is registered. * @param entry The cache entry descriptor (path, executable flag) * @param jobID The ID of the job for which the file is copied. * @return The handle to the task that copies the file. */ - public Future createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) { + public Future createTmpFile(String name, DistributedCacheEntry entry, JobID jobID, ExecutionAttemptID executionId) { synchronized (lock) { - Map>> jobEntries = entries.get(jobID); - if (jobEntries == null) { - jobEntries = new HashMap>>(); - entries.put(jobID, jobEntries); - } + Map> jobEntries = entries.computeIfAbsent(jobID, k -> new HashMap<>()); + final Set refHolders = jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>()); + refHolders.add(executionId); // tuple is (ref-count, parent-temp-dir, cached-file-path, copy-process) --- End diff -- outdated ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176767271 --- Diff: flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java --- @@ -40,6 +42,7 @@ /** * Test the distributed cache. */ +@Category(Flip6.class) --- End diff -- revert? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176768205 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java --- @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl } /** -* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper. +* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper. */ @Test public void testUploadJarFilesHelper() throws Exception { uploadJarFile(getBlobServer(), getBlobClientConfig()); } + @Test + public void testDirectoryUploading() throws IOException { + final File newFolder = temporaryFolder.newFolder(); + final File file1 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file1, "Test content"); + final File file2 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file2, "Test content 2"); + + final Map files = new HashMap<>(); + files.put(file1.getName(), file1); + files.put(file2.getName(), file2); + + BlobKey key; + final JobID jobId = new JobID(); + final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + try ( + BlobClient client = new BlobClient( + inetAddress, getBlobClientConfig())) { + + key = client.uploadFile(jobId, new Path(newFolder.getPath())); + } + + final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key); + + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { --- End diff -- that's an implementation detail though ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176766216 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log // /** -* Asynchronous file copy process. -*/ - private static class CopyProcess implements Callable { - - private final Path filePath; - private final Path cachedPath; - private boolean executable; - - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; - } - - @Override - public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; - } - } - - /** -* If no task is using this file after 5 seconds, clear it. +* Asynchronous file copy process from blob server. */ - private static class DeleteProcess implements Runnable { + private static class CopyFromBlobProcess implements Callable { - private final Object lock; - private final Map>>> entries; - - private final String name; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean directory; + private final boolean executable; private final JobID jobID; + private final PermanentBlobService blobService; - public DeleteProcess(Object lock, Map>>> entries, - String name, JobID jobID) { - this.lock = lock; - this.entries = entries; - this.name = name; - this.jobID = jobID; + CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) { + try { + this.executable = e.isExecutable; + this.directory = e.isZipped; + this.jobID = jobID; + this.blobService = blobService; + this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader()); + this.target = target; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override - public void run() { - try { - synchronized (lock) { - Map>> jobEntries = entries.get(jobID); - - if (jobEntries != null) { - Tuple4> entry = jobEntries.get(name); - - if (entry != null) { - int count = entry.f0; - if (count > 1) { - // multiple references still - entry.f0 = count - 1; - } - else { - // we remove the last reference - jobEntries.remove(name); - if (jobEntries.isEmpty()) { - entries.remove(jobID); - } - - // abort the copy - entry.f3.cancel(true); - - // remove the file - File file = new File(entry.f2.toString()); -
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176764920 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log // /** -* Asynchronous file copy process. -*/ - private static class CopyProcess implements Callable { - - private final Path filePath; - private final Path cachedPath; - private boolean executable; - - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; - } - - @Override - public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; - } - } - - /** -* If no task is using this file after 5 seconds, clear it. +* Asynchronous file copy process from blob server. */ - private static class DeleteProcess implements Runnable { + private static class CopyFromBlobProcess implements Callable { - private final Object lock; - private final Map>>> entries; - - private final String name; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean directory; + private final boolean executable; private final JobID jobID; + private final PermanentBlobService blobService; - public DeleteProcess(Object lock, Map>>> entries, - String name, JobID jobID) { - this.lock = lock; - this.entries = entries; - this.name = name; - this.jobID = jobID; + CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) { + try { + this.executable = e.isExecutable; + this.directory = e.isZipped; + this.jobID = jobID; + this.blobService = blobService; + this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader()); + this.target = target; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override - public void run() { - try { - synchronized (lock) { - Map>> jobEntries = entries.get(jobID); - - if (jobEntries != null) { - Tuple4> entry = jobEntries.get(name); - - if (entry != null) { - int count = entry.f0; - if (count > 1) { - // multiple references still - entry.f0 = count - 1; - } - else { - // we remove the last reference - jobEntries.remove(name); - if (jobEntries.isEmpty()) { - entries.remove(jobID); - } - - // abort the copy - entry.f3.cancel(true); - - // remove the file - File file = new File(entry.f2.toString()); -
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176747071 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log // /** -* Asynchronous file copy process. -*/ - private static class CopyProcess implements Callable { - - private final Path filePath; - private final Path cachedPath; - private boolean executable; - - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; - } - - @Override - public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; - } - } - - /** -* If no task is using this file after 5 seconds, clear it. +* Asynchronous file copy process from blob server. */ - private static class DeleteProcess implements Runnable { + private static class CopyFromBlobProcess implements Callable { - private final Object lock; - private final Map>>> entries; - - private final String name; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean directory; + private final boolean executable; private final JobID jobID; + private final PermanentBlobService blobService; - public DeleteProcess(Object lock, Map>>> entries, - String name, JobID jobID) { - this.lock = lock; - this.entries = entries; - this.name = name; - this.jobID = jobID; + CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) { + try { + this.executable = e.isExecutable; + this.directory = e.isZipped; + this.jobID = jobID; + this.blobService = blobService; + this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader()); + this.target = target; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override - public void run() { - try { - synchronized (lock) { - Map>> jobEntries = entries.get(jobID); - - if (jobEntries != null) { - Tuple4> entry = jobEntries.get(name); - - if (entry != null) { - int count = entry.f0; - if (count > 1) { - // multiple references still - entry.f0 = count - 1; - } - else { - // we remove the last reference - jobEntries.remove(name); - if (jobEntries.isEmpty()) { - entries.remove(jobID); - } - - // abort the copy - entry.f3.cancel(true); - - // remove the file - File file = new File(entry.f2.toString()); -
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176746645 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java --- @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl } /** -* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper. +* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper. */ @Test public void testUploadJarFilesHelper() throws Exception { uploadJarFile(getBlobServer(), getBlobClientConfig()); } + @Test + public void testDirectoryUploading() throws IOException { + final File newFolder = temporaryFolder.newFolder(); + final File file1 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file1, "Test content"); + final File file2 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file2, "Test content 2"); + + final Map files = new HashMap<>(); + files.put(file1.getName(), file1); + files.put(file2.getName(), file2); + + BlobKey key; + final JobID jobId = new JobID(); + final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + try ( + BlobClient client = new BlobClient( + inetAddress, getBlobClientConfig())) { + + key = client.uploadFile(jobId, new Path(newFolder.getPath())); + } + + final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key); + + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String fileName = entry.getName().replaceAll("/", ""); --- End diff -- It is not sufficient. In this testcase `entry.getName` returns `/pre,,,suff`. Note the `/` in the beginning. Though I've changed `replaceAll` to `replaceFirst`. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176745753 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) { * Any additional configuration for the blob client * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) -* @param jars -* List of JAR files to upload +* @param files +* List of files to upload * * @throws IOException * if the upload fails */ - public static List uploadJarFiles( - InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List jars) + public static List uploadFiles( + InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List files) throws IOException { checkNotNull(jobId); - if (jars.isEmpty()) { + if (files.isEmpty()) { return Collections.emptyList(); } else { List blobKeys = new ArrayList<>(); try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig)) { - for (final Path jar : jars) { - final FileSystem fs = jar.getFileSystem(); - FSDataInputStream is = null; - try { - is = fs.open(jar); - final PermanentBlobKey key = - (PermanentBlobKey) blobClient.putInputStream(jobId, is, PERMANENT_BLOB); - blobKeys.add(key); - } finally { - if (is != null) { - is.close(); - } - } + for (final Path file : files) { + final PermanentBlobKey key = blobClient.uploadFile(jobId, file); + blobKeys.add(key); } } return blobKeys; } } - // - // Miscellaneous - // - - private static Throwable readExceptionFromStream(InputStream in) throws IOException { - int len = readLength(in); - byte[] bytes = new byte[len]; - readFully(in, bytes, 0, len, "Error message"); + /** +* Uploads a single file to the {@link PermanentBlobService} of the given {@link BlobServer}. +* +* @param jobId +* ID of the job this blob belongs to (or null if job-unrelated) +* @param file +* file to upload +* +* @throws IOException +* if the upload fails +*/ + public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException { + final FileSystem fs = file.getFileSystem(); + if (fs.getFileStatus(file).isDir()) { + return uploadDirectory(jobId, file, fs); + } else { + try (InputStream is = fs.open(file)) { + return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB); + } + } + } - try { - return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader()); + private PermanentBlobKey uploadDirectory(JobID jobId, Path file, FileSystem fs) throws IOException { + try (BlobOutputStream blobOutputStream = new BlobOutputStream(jobId, PERMANENT_BLOB, socket)) { + try (ZipOutputStream zipStream = new ZipOutputStream(blobOutputStream)) { + compressDirectoryToZipfile(fs, fs.getFileStatus(file), fs.getFileStatus(file), zipStream); + zipStream.finish(); + return (PermanentBlobKey) blobOutputStream.finish(); + } } - catch (ClassNotFoundException e) { - // should never occur - throw new
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176745183 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java --- @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl } /** -* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper. +* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper. */ @Test public void testUploadJarFilesHelper() throws Exception { uploadJarFile(getBlobServer(), getBlobClientConfig()); } + @Test + public void testDirectoryUploading() throws IOException { + final File newFolder = temporaryFolder.newFolder(); + final File file1 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file1, "Test content"); + final File file2 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file2, "Test content 2"); + + final Map files = new HashMap<>(); + files.put(file1.getName(), file1); + files.put(file2.getName(), file2); + + BlobKey key; + final JobID jobId = new JobID(); + final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + try ( + BlobClient client = new BlobClient( + inetAddress, getBlobClientConfig())) { + + key = client.uploadFile(jobId, new Path(newFolder.getPath())); + } + + final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key); + + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { --- End diff -- It does not need to. `getNextEntry` does it, at the very beginning. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176658024 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -306,18 +311,20 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) CompletableFuture jobUploadFuture = portFuture.thenCombine( getDispatcherAddress(), (BlobServerPortResponseBody response, String dispatcherAddress) -> { - log.info("Uploading jar files."); final int blobServerPort = response.port; final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); final List keys; try { - keys = BlobClient.uploadJarFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); + log.info("Uploading jar files."); + keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); + log.info("Uploading custom files."); --- End diff -- don't think this should be logged if there's nothing to upload (which generally will be the default) ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176662757 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java --- @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl } /** -* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper. +* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper. */ @Test public void testUploadJarFilesHelper() throws Exception { uploadJarFile(getBlobServer(), getBlobClientConfig()); } + @Test + public void testDirectoryUploading() throws IOException { + final File newFolder = temporaryFolder.newFolder(); + final File file1 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file1, "Test content"); --- End diff -- we have somewhere in a `FileUtils` a writeUtf8 method ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176662599 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java --- @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl } /** -* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper. +* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper. */ @Test public void testUploadJarFilesHelper() throws Exception { uploadJarFile(getBlobServer(), getBlobClientConfig()); } + @Test + public void testDirectoryUploading() throws IOException { + final File newFolder = temporaryFolder.newFolder(); + final File file1 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file1, "Test content"); + final File file2 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file2, "Test content 2"); + + final Map files = new HashMap<>(); + files.put(file1.getName(), file1); + files.put(file2.getName(), file2); + + BlobKey key; + final JobID jobId = new JobID(); + final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + try ( + BlobClient client = new BlobClient( + inetAddress, getBlobClientConfig())) { + + key = client.uploadFile(jobId, new Path(newFolder.getPath())); + } + + final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key); + + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { --- End diff -- entry should be closed ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176663553 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -57,89 +65,59 @@ + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" - + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.\n"; + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); private FileCache fileCache; - private File f; - @Before - public void setup() throws IOException { - String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()}; - try { - fileCache = new FileCache(tmpDirectories); - } - catch (Exception e) { - e.printStackTrace(); - fail("Cannot create FileCache: " + e.getMessage()); + private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey(); + + private final PermanentBlobService blobService = new PermanentBlobService() { + @Override + public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { + if (key.equals(permanentBlobKey)) { + File f = temporaryFolder.newFile("cacheFile"); + FileUtils.writeFileUtf8(f, testFileContent); + return f; + } else { + throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey); + } } - f = temporaryFolder.newFile("cacheFile"); - try { - Files.write(testFileContent, f, Charsets.UTF_8); - } - catch (Exception e) { - e.printStackTrace(); - fail("Error initializing the test: " + e.getMessage()); + @Override + public void close() throws IOException { + } + }; + + @Before + public void setup() throws Exception { + fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService); } @After public void shutdown() { - try { - fileCache.shutdown(); - } - catch (Exception e) { - e.printStackTrace(); - fail("FileCache shutdown failed: " + e.getMessage()); - } + fileCache.shutdown(); } @Test - public void testFileReuseForNextTask() { - try { - final JobID jobID = new JobID(); - final String fileName = "test_file"; - - final String filePath = f.toURI().toString(); - - // copy / create the file - Future copyResult = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID); - copyResult.get(); - - // get another reference to the file - Future copyResult2 = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID); - - // this should be available immediately - assertTrue(copyResult2.isDone()); - - // delete the file - fileCache.deleteTmpFile(fileName, jobID); - // file should not yet be deleted - assertTrue(fileCache.holdsStillReference(fileName, jobID)); - - // delete the second reference - fileCache.deleteTmpFile(fileName, jobID); - // file should still not be deleted, but remain for a bit - assertTrue(fileCache.holdsStillReference(fileName, jobID)); - - fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID); - fileCache.deleteTmpFile(fileName, jobID); - - // after a while, the file should disappear - long deadline = System.currentTimeMillis() + 2; - do { - Thread.sleep(5500); - } - while (fileCache.holdsStillR
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176658632 --- Diff: flink-end-to-end-tests/pom.xml --- @@ -78,6 +78,27 @@ under the License. + + DistributedCacheViaBlobTestProgram + package + + jar + + + DistributedCacheViaBlobTestProgram + + + + org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram + + + + + org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.class --- End diff -- This can use wildcards, no? `org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram* ` ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176660974 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) { * Any additional configuration for the blob client * @param jobId * ID of the job this blob belongs to (or null if job-unrelated) -* @param jars -* List of JAR files to upload +* @param files +* List of files to upload * * @throws IOException * if the upload fails */ - public static List uploadJarFiles( - InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List jars) + public static List uploadFiles( + InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List files) throws IOException { checkNotNull(jobId); - if (jars.isEmpty()) { + if (files.isEmpty()) { return Collections.emptyList(); } else { List blobKeys = new ArrayList<>(); try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig)) { - for (final Path jar : jars) { - final FileSystem fs = jar.getFileSystem(); - FSDataInputStream is = null; - try { - is = fs.open(jar); - final PermanentBlobKey key = - (PermanentBlobKey) blobClient.putInputStream(jobId, is, PERMANENT_BLOB); - blobKeys.add(key); - } finally { - if (is != null) { - is.close(); - } - } + for (final Path file : files) { + final PermanentBlobKey key = blobClient.uploadFile(jobId, file); + blobKeys.add(key); } } return blobKeys; } } - // - // Miscellaneous - // - - private static Throwable readExceptionFromStream(InputStream in) throws IOException { - int len = readLength(in); - byte[] bytes = new byte[len]; - readFully(in, bytes, 0, len, "Error message"); + /** +* Uploads a single file to the {@link PermanentBlobService} of the given {@link BlobServer}. +* +* @param jobId +* ID of the job this blob belongs to (or null if job-unrelated) +* @param file +* file to upload +* +* @throws IOException +* if the upload fails +*/ + public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException { + final FileSystem fs = file.getFileSystem(); + if (fs.getFileStatus(file).isDir()) { + return uploadDirectory(jobId, file, fs); + } else { + try (InputStream is = fs.open(file)) { + return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB); + } + } + } - try { - return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader()); + private PermanentBlobKey uploadDirectory(JobID jobId, Path file, FileSystem fs) throws IOException { + try (BlobOutputStream blobOutputStream = new BlobOutputStream(jobId, PERMANENT_BLOB, socket)) { + try (ZipOutputStream zipStream = new ZipOutputStream(blobOutputStream)) { + compressDirectoryToZipfile(fs, fs.getFileStatus(file), fs.getFileStatus(file), zipStream); + zipStream.finish(); + return (PermanentBlobKey) blobOutputStream.finish(); + } } - catch (ClassNotFoundException e) { - // should never occur - throw new IO
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176658516 --- Diff: flink-end-to-end-tests/pom.xml --- @@ -78,6 +78,27 @@ under the License. + --- End diff -- this will need a rebase; each tests now has its own module ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176659832 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -488,13 +493,36 @@ public void addJar(Path jar) { return userJars; } + /** +* Adds the path of a JAR file required to run the job on a task manager. --- End diff -- it may not be a jar file ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176659407 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -159,7 +162,7 @@ public ExecutionConfig getConfig() { * Get the list of cached files that were registered for distribution among the task managers. */ public List> getCachedFiles() { - return cacheFile; + return cacheFile.entrySet().stream().map(e -> Tuple2.of(e.getKey(), e.getValue())).collect(Collectors.toList()); --- End diff -- if the mapping is never used there's no value in using a map in the first place. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176662637 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java --- @@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl } /** -* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, JobID, List)} helper. +* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, Configuration, JobID, List)} helper. */ @Test public void testUploadJarFilesHelper() throws Exception { uploadJarFile(getBlobServer(), getBlobClientConfig()); } + @Test + public void testDirectoryUploading() throws IOException { + final File newFolder = temporaryFolder.newFolder(); + final File file1 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file1, "Test content"); + final File file2 = File.createTempFile("pre", "suff", newFolder); + FileUtils.writeStringToFile(file2, "Test content 2"); + + final Map files = new HashMap<>(); + files.put(file1.getName(), file1); + files.put(file2.getName(), file2); + + BlobKey key; + final JobID jobId = new JobID(); + final InetSocketAddress inetAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + try ( + BlobClient client = new BlobClient( + inetAddress, getBlobClientConfig())) { + + key = client.uploadFile(jobId, new Path(newFolder.getPath())); + } + + final File file = getBlobServer().getFile(jobId, (PermanentBlobKey) key); + + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String fileName = entry.getName().replaceAll("/", ""); --- End diff -- `Path#getName()`? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176662188 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log // /** -* Asynchronous file copy process. -*/ - private static class CopyProcess implements Callable { - - private final Path filePath; - private final Path cachedPath; - private boolean executable; - - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; - } - - @Override - public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; - } - } - - /** -* If no task is using this file after 5 seconds, clear it. +* Asynchronous file copy process from blob server. */ - private static class DeleteProcess implements Runnable { + private static class CopyFromBlobProcess implements Callable { - private final Object lock; - private final Map>>> entries; - - private final String name; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean directory; + private final boolean executable; private final JobID jobID; + private final PermanentBlobService blobService; - public DeleteProcess(Object lock, Map>>> entries, --- End diff -- who's responsible for cleaning up the created local files? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176658879 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java --- @@ -75,15 +66,12 @@ */ @PublicEvolving public class PythonStreamExecutionEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); --- End diff -- I would leave it even if it isn't used ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176661729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log // /** -* Asynchronous file copy process. -*/ - private static class CopyProcess implements Callable { - - private final Path filePath; - private final Path cachedPath; - private boolean executable; - - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; - } - - @Override - public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; - } - } - - /** -* If no task is using this file after 5 seconds, clear it. +* Asynchronous file copy process from blob server. */ - private static class DeleteProcess implements Runnable { + private static class CopyFromBlobProcess implements Callable { - private final Object lock; - private final Map>>> entries; - - private final String name; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean directory; + private final boolean executable; private final JobID jobID; + private final PermanentBlobService blobService; - public DeleteProcess(Object lock, Map>>> entries, - String name, JobID jobID) { - this.lock = lock; - this.entries = entries; - this.name = name; - this.jobID = jobID; + CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) { + try { + this.executable = e.isExecutable; + this.directory = e.isZipped; + this.jobID = jobID; + this.blobService = blobService; + this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader()); + this.target = target; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override - public void run() { - try { - synchronized (lock) { - Map>> jobEntries = entries.get(jobID); - - if (jobEntries != null) { - Tuple4> entry = jobEntries.get(name); - - if (entry != null) { - int count = entry.f0; - if (count > 1) { - // multiple references still - entry.f0 = count - 1; - } - else { - // we remove the last reference - jobEntries.remove(name); - if (jobEntries.isEmpty()) { - entries.remove(jobID); - } - - // abort the copy - entry.f3.cancel(true); - - // remove the file - File file = new File(entry.f2.toString()); -
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176662946 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -19,30 +19,38 @@ package org.apache.flink.runtime.filecache; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.shaded.guava18.com.google.common.base.Charsets; import org.apache.flink.shaded.guava18.com.google.common.io.Files; +import org.apache.commons.lang3.StringUtils; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Future; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** - * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds. + * Tests that {@link FileCache} can read files from {@link BlobServer}. */ -public class FileCacheDeleteValidationTest { +@RunWith(MockitoJUnitRunner.class) --- End diff -- Why is this needed now? I don't see any mocking being added. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176661496 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log // /** -* Asynchronous file copy process. -*/ - private static class CopyProcess implements Callable { - - private final Path filePath; - private final Path cachedPath; - private boolean executable; - - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; - } - - @Override - public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; - } - } - - /** -* If no task is using this file after 5 seconds, clear it. +* Asynchronous file copy process from blob server. */ - private static class DeleteProcess implements Runnable { + private static class CopyFromBlobProcess implements Callable { - private final Object lock; - private final Map>>> entries; - - private final String name; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean directory; --- End diff -- -> `isDirectory` ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r176657746 --- Diff: flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java --- @@ -30,6 +33,7 @@ /** * Tests for the PythonPlanBinder. */ +@Category(Flip6.class) --- End diff -- why does this not run anymore against legacy clusters? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r171184258 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, String name) { * @param executable flag indicating whether the file should be executable */ public void registerCachedFile(String filePath, String name, boolean executable) { - this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable))); + registerCachedFile(filePath, name, executable, false); } + + /** +* Registers a file at the distributed cache under the given name. The file will be accessible +* from any user-defined function in the (distributed) runtime under a local path. If upload is true files will +* be distributed via {@link BlobServer} otherwise Files should be local files (as long as all relevant workers +* have access to it), or files in a distributed file system. The runtime will copy the files temporarily to a +* local cache, if needed. +* +* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via +* {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access +* {@link org.apache.flink.api.common.cache.DistributedCache} via +* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. +* +* @param filePath The path of the file +* @param name The name under which the file is registered. +* @param executable flag indicating whether the file should be executable +* @param upload flag indicating if the file should be distributed via BlobServer +*/ + public void registerCachedFile(String filePath, String name, boolean executable, boolean upload) { --- End diff -- send everything through the blob service ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r171181348 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, String name) { * @param executable flag indicating whether the file should be executable */ public void registerCachedFile(String filePath, String name, boolean executable) { - this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable))); + registerCachedFile(filePath, name, executable, false); } + + /** +* Registers a file at the distributed cache under the given name. The file will be accessible +* from any user-defined function in the (distributed) runtime under a local path. If upload is true files will +* be distributed via {@link BlobServer} otherwise Files should be local files (as long as all relevant workers +* have access to it), or files in a distributed file system. The runtime will copy the files temporarily to a +* local cache, if needed. +* +* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via +* {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access +* {@link org.apache.flink.api.common.cache.DistributedCache} via +* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. +* +* @param filePath The path of the file +* @param name The name under which the file is registered. +* @param executable flag indicating whether the file should be executable +* @param upload flag indicating if the file should be distributed via BlobServer +*/ + public void registerCachedFile(String filePath, String name, boolean executable, boolean upload) { --- End diff -- What behaviour do you suggest sending all local files through Blob and serving files from DFS as before? ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r171175263 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.filecache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.flink.shaded.guava18.com.google.common.base.Charsets; +import org.apache.flink.shaded.guava18.com.google.common.io.Files; + +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +/** + * Tests that {@link FileCache} can read files from {@link BlobServer}. + */ +@RunWith(MockitoJUnitRunner.class) +public class FileCacheReadsFromBlobTest { + + private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileCache fileCache; + + @Mock + private PermanentBlobService blobService; + + @Before + public void setup() throws IOException { + try { + String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()}; + fileCache = new FileCache(tmpDirectories, blobService); + } catch (Exception e) { + e.printStackTrace(); + fail("Cannot create FileCache: " + e.getMessage()); + } + } + + @After + public void shutdown() { + try { + fileCache.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + fail("FileCache shutdown failed: " + e.getMessage()); + } + } + + @Te
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r171175037 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.filecache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.flink.shaded.guava18.com.google.common.base.Charsets; +import org.apache.flink.shaded.guava18.com.google.common.io.Files; + +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +/** + * Tests that {@link FileCache} can read files from {@link BlobServer}. + */ +@RunWith(MockitoJUnitRunner.class) +public class FileCacheReadsFromBlobTest { + + private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileCache fileCache; + + @Mock + private PermanentBlobService blobService; + + @Before + public void setup() throws IOException { + try { + String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()}; + fileCache = new FileCache(tmpDirectories, blobService); + } catch (Exception e) { + e.printStackTrace(); + fail("Cannot create FileCache: " + e.getMessage()); + } + } + + @After + public void shutdown() { + try { + fileCache.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + fail("FileCache shutdown failed: " + e.getMessage()); + } + } + + @Te
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r171174660 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheReadsFromBlobTest.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.filecache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.PermanentBlobService; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.flink.shaded.guava18.com.google.common.base.Charsets; +import org.apache.flink.shaded.guava18.com.google.common.io.Files; + +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +/** + * Tests that {@link FileCache} can read files from {@link BlobServer}. + */ +@RunWith(MockitoJUnitRunner.class) +public class FileCacheReadsFromBlobTest { + + private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n" + + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n" + + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n" + + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n" + + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n" + + "hohen Werke Sind herrlich wie am ersten Tag.\n" + + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n" + + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n" + + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n" + + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n" + + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n" + + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n" + + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private FileCache fileCache; + + @Mock + private PermanentBlobService blobService; + + @Before + public void setup() throws IOException { + try { + String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()}; + fileCache = new FileCache(tmpDirectories, blobService); + } catch (Exception e) { + e.printStackTrace(); + fail("Cannot create FileCache: " + e.getMessage()); + } + } + + @After + public void shutdown() { + try { + fileCache.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + fail("FileCache shutdown failed: " + e.getMessage()); + } + } + + @Te
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r171174187 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -1829,6 +1830,28 @@ public void registerCachedFile(String filePath, String name) { * @param executable flag indicating whether the file should be executable */ public void registerCachedFile(String filePath, String name, boolean executable) { - this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable))); + registerCachedFile(filePath, name, executable, false); } + + /** +* Registers a file at the distributed cache under the given name. The file will be accessible +* from any user-defined function in the (distributed) runtime under a local path. If upload is true files will +* be distributed via {@link BlobServer} otherwise Files should be local files (as long as all relevant workers +* have access to it), or files in a distributed file system. The runtime will copy the files temporarily to a +* local cache, if needed. +* +* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via +* {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access +* {@link org.apache.flink.api.common.cache.DistributedCache} via +* {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}. +* +* @param filePath The path of the file +* @param name The name under which the file is registered. +* @param executable flag indicating whether the file should be executable +* @param upload flag indicating if the file should be distributed via BlobServer +*/ + public void registerCachedFile(String filePath, String name, boolean executable, boolean upload) { --- End diff -- I'm wondering whether we really need a new method. We could just modify the behavior of the existing method which should be transparent to the user. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r171172025 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java --- @@ -106,9 +106,9 @@ * are created for the plan nodes, on the way back up, the nodes connect their predecessors. */ public class JobGraphGenerator implements Visitor { - + public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux"; - --- End diff -- Sorry for that. Forgot to disable stripping whitespaces on save. Will revert those changes. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r171171252 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java --- @@ -106,9 +106,9 @@ * are created for the plan nodes, on the way back up, the nodes connect their predecessors. */ public class JobGraphGenerator implements Visitor { - + public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux"; - --- End diff -- this would be a lot easier to review without the formatting changes :/ ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Github user ifndef-SleePy commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r170804963 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java --- @@ -314,6 +315,9 @@ public static TaskExecutor startTaskManager( TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths(), --- End diff -- It seems that the `fileCache` here is not been used at all. ---
[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/5580 [FLINK-8620] Enable shipping custom files to BlobStore and accessing ⦠â¦them through DistributedCache *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink distributed-cache Alternatively you can