[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r134142721 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -108,11 +139,63 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } /** +* Registers use of job-related BLOBs. +* +* Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls +* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}. +* +* @param jobId +* ID of the job this blob belongs to +* +* @see #releaseJob(JobID) +*/ + public void registerJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; --- End diff -- Should keepUntil be modified (in case the code at line 193 runs) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r134142624 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -108,11 +139,63 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } /** +* Registers use of job-related BLOBs. +* +* Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls +* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}. +* +* @param jobId +* ID of the job this blob belongs to +* +* @see #releaseJob(JobID) +*/ + public void registerJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + /** +* Unregisters use of job-related BLOBs and allow them to be released. +* +* @param jobId +* ID of the job this blob belongs to +* +* @see #registerJob(JobID) +*/ + public void releaseJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); --- End diff -- Including jobId would help troubleshooting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4238 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133688568 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testtasks; + +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +/** + * Task which blocks until the (static) {@link #unblock()} method is called and then fails with an + * exception. + */ +public class FailingBlockingInvokable extends AbstractInvokable { + private static boolean blocking = true; --- End diff -- oh, sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133688433 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.FailingBlockingInvokable; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; + +import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed + * after job termination. + */ +public class JobManagerCleanupITCase { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Specifies which test case to run in {@link #testBlobServerCleanup(TestCase)}. +*/ + private enum TestCase { + JOB_FINISHES_SUCESSFULLY, + JOB_IS_CANCELLED, + JOB_FAILS, + JOB_SUBMISSION_FAILS + } + + /** +* Test cleanup for a job that finishes ordinarily. +*/ + @Test + public void testBlobServerCleanupFinishedJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY); + } + + /** +* Test cleanup for a job which is cancelled after submission. +*/ + @Test + public void testBlobServerCleanupCancelledJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_IS_CANCELLED); + } + + /** +* Test cleanup for a job that fails (first a task fails, then the job recovers, then the whole +* job fails due to a limited restart policy). +*/ + @Test + public void testBlobServerCleanupFailedJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_FAILS); + } + + /** +
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133687826 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}. + */ +public class BlobCacheCleanupTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}. +*/ + @Test + public void testJobCleanup() throws IOException, InterruptedException { + + JobID jobId = new JobID(); + List keys = new ArrayList(); + BlobServer server = null; + BlobCache cache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + + bc.close(); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + // register once + cache.registerJob(jobId); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + // register again (let's say, from another thread or so) + cache.registerJob(jobId); + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing once, nothing should change + cache.releaseJob(jobId); + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing the second time, the job is up for deferred cleanup + cache.releaseJob(jobId); + + // because we cannot
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133685671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -148,7 +149,10 @@ /** Service to contend for and retrieve the leadership of JM and RM */ private final HighAvailabilityServices highAvailabilityServices; - /** Blob cache manager used across jobs */ + /** Blob server used across jobs */ + private final BlobServer blobServer; --- End diff -- Actually, I'd rather let the services involved in the life cycle of the BLOBs known what they are dealing with than adding empty `register/releaseJob` methods to the `BlobService`/`BlobServer`. Once we move the `BlobServer` out of the `JobMaster`, we can change the class of the parameters and also remove the `register/releaseJob` calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133673650 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.FailingBlockingInvokable; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; + +import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed + * after job termination. + */ +public class JobManagerCleanupITCase { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Specifies which test case to run in {@link #testBlobServerCleanup(TestCase)}. +*/ + private enum TestCase { + JOB_FINISHES_SUCESSFULLY, + JOB_IS_CANCELLED, + JOB_FAILS, + JOB_SUBMISSION_FAILS + } + + /** +* Test cleanup for a job that finishes ordinarily. +*/ + @Test + public void testBlobServerCleanupFinishedJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY); + } + + /** +* Test cleanup for a job which is cancelled after submission. +*/ + @Test + public void testBlobServerCleanupCancelledJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_IS_CANCELLED); + } + + /** +* Test cleanup for a job that fails (first a task fails, then the job recovers, then the whole +* job fails due to a limited restart policy). +*/ + @Test + public void testBlobServerCleanupFailedJob() throws IOException { + testBlobServerCleanup(TestCase.JOB_FAILS); + } + + /**
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133668912 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}. + */ +public class BlobCacheCleanupTest { --- End diff -- Let's extend from the `TestLogger` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133670563 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}. + */ +public class BlobCacheCleanupTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}. +*/ + @Test + public void testJobCleanup() throws IOException, InterruptedException { + + JobID jobId = new JobID(); + List keys = new ArrayList(); + BlobServer server = null; + BlobCache cache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + + bc.close(); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + // register once + cache.registerJob(jobId); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + // register again (let's say, from another thread or so) + cache.registerJob(jobId); + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing once, nothing should change + cache.releaseJob(jobId); + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing the second time, the job is up for deferred cleanup + cache.releaseJob(jobId); + + // because we
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133669690 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}. + */ +public class BlobCacheCleanupTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}. +*/ + @Test + public void testJobCleanup() throws IOException, InterruptedException { + + JobID jobId = new JobID(); + List keys = new ArrayList(); + BlobServer server = null; + BlobCache cache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + + bc.close(); --- End diff -- Maybe we could close the client in the finally block as well if something goes wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133670196 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}. + */ +public class BlobCacheCleanupTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}. +*/ + @Test + public void testJobCleanup() throws IOException, InterruptedException { + + JobID jobId = new JobID(); + List keys = new ArrayList(); + BlobServer server = null; + BlobCache cache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + + bc.close(); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + // register once + cache.registerJob(jobId); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + // register again (let's say, from another thread or so) + cache.registerJob(jobId); + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing once, nothing should change + cache.releaseJob(jobId); + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing the second time, the job is up for deferred cleanup + cache.releaseJob(jobId); + + // because we
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133674703 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testtasks; + +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +/** + * Task which blocks until the (static) {@link #unblock()} method is called and then fails with an + * exception. + */ +public class FailingBlockingInvokable extends AbstractInvokable { + private static boolean blocking = true; --- End diff -- I think we should make `blocking` `volatile`. Otherwise we might miss changes and end up in a deadlock. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133671238 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java --- @@ -45,6 +42,18 @@ import java.util.Collections; import java.util.List; +import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob; +import static org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + +/** + * Tests for {@link BlobLibraryCacheManager}. + */ public class BlobLibraryCacheManagerTest { --- End diff -- Let's extend from `TestLogger` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133673260 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.FailingBlockingInvokable; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; + +import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed + * after job termination. + */ +public class JobManagerCleanupITCase { --- End diff -- `TestLogger` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133494977 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java --- @@ -73,4 +73,9 @@ public static final ConfigOption SSL_ENABLED = key("blob.service.ssl.enabled") .defaultValue(true); + + public static final ConfigOption CLEANUP_INTERVAL = + key("blob.service.cleanup.interval") --- End diff -- We should document this configuration parameter in the `config.md` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133500735 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java --- @@ -63,21 +67,22 @@ private final PartitionProducerStateChecker partitionStateChecker; public JobManagerConnection( - JobID jobID, - ResourceID resourceID, - JobMasterGateway jobMasterGateway, - UUID leaderId, - TaskManagerActions taskManagerActions, - CheckpointResponder checkpointResponder, - LibraryCacheManager libraryCacheManager, - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, - PartitionProducerStateChecker partitionStateChecker) { + JobID jobID, + ResourceID resourceID, + JobMasterGateway jobMasterGateway, + UUID leaderId, + TaskManagerActions taskManagerActions, + CheckpointResponder checkpointResponder, + BlobCache blobCache, LibraryCacheManager libraryCacheManager, --- End diff -- Formatting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133497146 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -341,8 +424,39 @@ public int getPort() { return serverAddress.getPort(); } + /** +* Cleans up BLOBs which are not referenced anymore. +*/ + @Override + public void run() { + synchronized (jobRefCounters) { + Iterator> entryIter = jobRefCounters.entrySet().iterator(); + + while (entryIter.hasNext()) { + Map.Entry entry = entryIter.next(); + RefCount ref = entry.getValue(); + + if (ref.references <= 0 && ref.keepUntil > 0 && System.currentTimeMillis() >= ref.keepUntil) { --- End diff -- We could store the current time so that we don't retrieve it for every entry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133503488 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -148,7 +149,10 @@ /** Service to contend for and retrieve the leadership of JM and RM */ private final HighAvailabilityServices highAvailabilityServices; - /** Blob cache manager used across jobs */ + /** Blob server used across jobs */ + private final BlobServer blobServer; --- End diff -- I think the `JobMaster` should not depend on the `BlobServer` because the `BlobServer` might run on a different node (along side to the dispatcher, for example). Can we pass in the `BlobService` which can either be a `BlobServer` or a `BlobCache`? I think we have to move the `register/releaseJob` methods to the `BlobService` for that. Additionally, the `BlobServer` should simply do nothing when these methods are called. That way, we can also pass in a `BlobService` to the `Task` and the `TaskExecutor/TaskManager` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125827511 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) { clientSocket.close(); } finally { - if (fos != null) { - try { - fos.close(); - } catch (Throwable t) { - LOG.warn("Cannot close stream to BLOB staging file", t); - } - } if (incomingFile != null) { - if (!incomingFile.delete()) { + if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); } } } } /** -* Handles an incoming DELETE request from a BLOB client. -* -* @param inputStream The input stream to read the request from. -* @param outputStream The output stream to write the response to. -* @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream. +* Reads a full file from inputStream into incomingFile returning its checksum. +* +* @param inputStream +* stream to read from +* @param incomingFile +* file to write to +* @param buf +* An auxiliary buffer for data serialization/deserialization +* +* @return the received file's content hash as a BLOB key +* +* @throws IOException +* thrown if an I/O error occurs while reading/writing data from/to the respective streams */ - private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + private static BlobKey readFileFully( + final InputStream inputStream, final File incomingFile, final byte[] buf) + throws IOException { + MessageDigest md = BlobUtils.createMessageDigest(); + FileOutputStream fos = new FileOutputStream(incomingFile); try { - int type = inputStream.read(); - if (type < 0) { - throw new EOFException("Premature end of DELETE request"); - } - - if (type == CONTENT_ADDRESSABLE) { - BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - - blobStore.delete(key); - } finally { - writeLock.unlock(); + while (true) { + final int bytesExpected = readLength(inputStream); + if (bytesExpected == -1) { + // done + break; + } + if (bytesExpected > BUFFER_SIZE) { + throw new IOException( + "Unexpected number of incoming bytes: " + bytesExpected); } - } - else if (type == NAME_ADDRESSABLE) { - byte[] jidBytes = new byte[JobID.SIZE]; - readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); - JobID jobID = JobID.fromByteArray(jidBytes); - String key = readKey(buf, inputStream); + readFully(inputStream, buf, 0, bytesExpected, "buffer"); + fos.write(buf, 0, bytesExpected); -
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125577195 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) { clientSocket.close(); } finally { - if (fos != null) { - try { - fos.close(); - } catch (Throwable t) { - LOG.warn("Cannot close stream to BLOB staging file", t); - } - } if (incomingFile != null) { - if (!incomingFile.delete()) { + if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); } } } } /** -* Handles an incoming DELETE request from a BLOB client. -* -* @param inputStream The input stream to read the request from. -* @param outputStream The output stream to write the response to. -* @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream. +* Reads a full file from inputStream into incomingFile returning its checksum. +* +* @param inputStream +* stream to read from +* @param incomingFile +* file to write to +* @param buf +* An auxiliary buffer for data serialization/deserialization +* +* @return the received file's content hash as a BLOB key +* +* @throws IOException +* thrown if an I/O error occurs while reading/writing data from/to the respective streams */ - private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + private static BlobKey readFileFully( + final InputStream inputStream, final File incomingFile, final byte[] buf) + throws IOException { + MessageDigest md = BlobUtils.createMessageDigest(); + FileOutputStream fos = new FileOutputStream(incomingFile); try { - int type = inputStream.read(); - if (type < 0) { - throw new EOFException("Premature end of DELETE request"); - } - - if (type == CONTENT_ADDRESSABLE) { - BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - - blobStore.delete(key); - } finally { - writeLock.unlock(); + while (true) { + final int bytesExpected = readLength(inputStream); + if (bytesExpected == -1) { + // done + break; + } + if (bytesExpected > BUFFER_SIZE) { + throw new IOException( + "Unexpected number of incoming bytes: " + bytesExpected); } - } - else if (type == NAME_ADDRESSABLE) { - byte[] jidBytes = new byte[JobID.SIZE]; - readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); - JobID jobID = JobID.fromByteArray(jidBytes); - String key = readKey(buf, inputStream); + readFully(inputStream, buf, 0, bytesExpected, "buffer"); + fos.write(buf, 0, bytesExpected); -
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125430786 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) { clientSocket.close(); } finally { - if (fos != null) { - try { - fos.close(); - } catch (Throwable t) { - LOG.warn("Cannot close stream to BLOB staging file", t); - } - } if (incomingFile != null) { - if (!incomingFile.delete()) { + if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); } } } } /** -* Handles an incoming DELETE request from a BLOB client. -* -* @param inputStream The input stream to read the request from. -* @param outputStream The output stream to write the response to. -* @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream. +* Reads a full file from inputStream into incomingFile returning its checksum. +* +* @param inputStream +* stream to read from +* @param incomingFile +* file to write to +* @param buf +* An auxiliary buffer for data serialization/deserialization +* +* @return the received file's content hash as a BLOB key +* +* @throws IOException +* thrown if an I/O error occurs while reading/writing data from/to the respective streams */ - private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + private static BlobKey readFileFully( + final InputStream inputStream, final File incomingFile, final byte[] buf) + throws IOException { + MessageDigest md = BlobUtils.createMessageDigest(); + FileOutputStream fos = new FileOutputStream(incomingFile); try { - int type = inputStream.read(); - if (type < 0) { - throw new EOFException("Premature end of DELETE request"); - } - - if (type == CONTENT_ADDRESSABLE) { - BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - - blobStore.delete(key); - } finally { - writeLock.unlock(); + while (true) { + final int bytesExpected = readLength(inputStream); + if (bytesExpected == -1) { + // done + break; + } + if (bytesExpected > BUFFER_SIZE) { + throw new IOException( + "Unexpected number of incoming bytes: " + bytesExpected); } - } - else if (type == NAME_ADDRESSABLE) { - byte[] jidBytes = new byte[JobID.SIZE]; - readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); - JobID jobID = JobID.fromByteArray(jidBytes); - String key = readKey(buf, inputStream); + readFully(inputStream, buf, 0, bytesExpected, "buffer"); + fos.write(buf, 0, bytesExpected); -
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125430028 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -162,105 +164,116 @@ static File initStorageDirectory(String storageDirectory) throws } /** -* Returns the BLOB service's directory for incoming files. The directory is created if it did -* not exist so far. +* Returns the BLOB service's directory for incoming (job-unrelated) files. The directory is +* created if it does not exist yet. +* +* @param storageDir +* storage directory used be the BLOB service * -* @return the BLOB server's directory for incoming files +* @return the BLOB service's directory for incoming files */ static File getIncomingDirectory(File storageDir) { final File incomingDir = new File(storageDir, "incoming"); - if (!incomingDir.mkdirs() && !incomingDir.exists()) { - throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath()); - } + mkdirTolerateExisting(incomingDir, "incoming"); return incomingDir; } /** -* Returns the BLOB service's directory for cached files. The directory is created if it did -* not exist so far. +* Makes sure a given directory exists by creating it if necessary. * -* @return the BLOB server's directory for cached files +* @param dir +* directory to create +* @param dirType +* the type of the directory (included in error message if something fails) */ - private static File getCacheDirectory(File storageDir) { - final File cacheDirectory = new File(storageDir, "cache"); - - if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) { - throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'."); + private static void mkdirTolerateExisting(final File dir, final String dirType) { + // note: thread-safe create should try to mkdir first and then ignore the case that the + // directory already existed + if (!dir.mkdirs() && !dir.exists()) { + throw new RuntimeException( + "Cannot create " + dirType + " directory '" + dir.getAbsolutePath() + "'."); } - - return cacheDirectory; } /** * Returns the (designated) physical storage location of the BLOB with the given key. * +* @param storageDir +* storage directory used be the BLOB service * @param key -*the key identifying the BLOB +* the key identifying the BLOB +* @param jobId +* ID of the job for the incoming files (or null if job-unrelated) +* * @return the (designated) physical storage location of the BLOB */ - static File getStorageLocation(File storageDir, BlobKey key) { - return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX + key.toString()); - } + static File getStorageLocation( + @Nonnull File storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) { + File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key)); - /** -* Returns the (designated) physical storage location of the BLOB with the given job ID and key. -* -* @param jobID -*the ID of the job the BLOB belongs to -* @param key -*the key of the BLOB -* @return the (designated) physical storage location of the BLOB with the given job ID and key -*/ - static File getStorageLocation(File storageDir, JobID jobID, String key) { - return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key)); + mkdirTolerateExisting(file.getParentFile(), "cache"); --- End diff -- "cache" is actually not the directory name but part of an error message (if there is an error) - thinking a bit more about this though, I can remove that parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125406639 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobServerLibraryManager.java --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.BlobServer; + +import javax.annotation.Nonnull; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Specialisation of {@link BlobLibraryCacheManager} that works with a {@link BlobServer}. + */ +public final class BlobServerLibraryManager extends BlobLibraryCacheManager { --- End diff -- Oh yes, the API is not too clean here - let my give it another shot: I wanted to get rid of the `BlobService#registerJob()` and `BlobService#releaseJob()` calls and only want to make them available in the `BlobCache`. Let's extract these two calls from the `BlobLibraryCacheManager` since they should be called with the `FallbackLibraryCacheManager` as well... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125404358 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -107,146 +133,268 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } + @Override + public void registerJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + @Override + public void releaseJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); + return; + } + + --ref.references; + if (ref.references == 0) { + ref.keepUntil = System.currentTimeMillis() + cleanupInterval; + } + } + } + + /** +* Returns local copy of the (job-unrelated) file for the BLOB with the given key. +* +* The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in +* the cache, the method will try to download it from this cache's BLOB server. +* +* @param key +* The key of the desired BLOB. +* +* @return file referring to the local storage location of the BLOB. +* +* @throws IOException +* Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + @Override + public File getFile(BlobKey key) throws IOException { + return getFileInternal(null, key); + } + /** -* Returns the URL for the BLOB with the given key. The method will first attempt to serve -* the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it -* from this cache's BLOB server. +* Returns local copy of the file for the BLOB with the given key. +* +* The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in +* the cache, the method will try to download it from this cache's BLOB server. * -* @param requiredBlob The key of the desired BLOB. -* @return URL referring to the local storage location of the BLOB. -* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +* @param jobId +* ID of the job this blob belongs to +* @param key +* The key of the desired BLOB. +* +* @return file referring to the local storage location of the BLOB. +* +* @throws IOException +* Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ - public URL getURL(final BlobKey requiredBlob) throws IOException { + @Override + public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException { + checkNotNull(jobId); + return getFileInternal(jobId, key); + } + + /** +* Returns local copy of the file for the BLOB with the given key. +* +* The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in +* the cache, the method will try to download it from this cache's BLOB server. +* +* @param jobId +* ID of the job this blob belongs to (or null if job-unrelated) +* @param requiredBlob +* The key of the desired BLOB. +* +* @return file referring to the local storage location of the BLOB. +* +* @throws IOException +* Thrown if an I/O error occurs
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125247578 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java --- @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) { clientSocket.close(); } finally { - if (fos != null) { - try { - fos.close(); - } catch (Throwable t) { - LOG.warn("Cannot close stream to BLOB staging file", t); - } - } if (incomingFile != null) { - if (!incomingFile.delete()) { + if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); } } } } /** -* Handles an incoming DELETE request from a BLOB client. -* -* @param inputStream The input stream to read the request from. -* @param outputStream The output stream to write the response to. -* @throws java.io.IOException Thrown if an I/O error occurs while reading the request data from the input stream. +* Reads a full file from inputStream into incomingFile returning its checksum. +* +* @param inputStream +* stream to read from +* @param incomingFile +* file to write to +* @param buf +* An auxiliary buffer for data serialization/deserialization +* +* @return the received file's content hash as a BLOB key +* +* @throws IOException +* thrown if an I/O error occurs while reading/writing data from/to the respective streams */ - private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { + private static BlobKey readFileFully( + final InputStream inputStream, final File incomingFile, final byte[] buf) + throws IOException { + MessageDigest md = BlobUtils.createMessageDigest(); + FileOutputStream fos = new FileOutputStream(incomingFile); try { - int type = inputStream.read(); - if (type < 0) { - throw new EOFException("Premature end of DELETE request"); - } - - if (type == CONTENT_ADDRESSABLE) { - BlobKey key = BlobKey.readFromInputStream(inputStream); - File blobFile = blobServer.getStorageLocation(key); - - writeLock.lock(); - - try { - // we should make the local and remote file deletion atomic, otherwise we might risk not - // removing the remote file in case of a concurrent put operation - if (blobFile.exists() && !blobFile.delete()) { - throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath()); - } - - blobStore.delete(key); - } finally { - writeLock.unlock(); + while (true) { + final int bytesExpected = readLength(inputStream); + if (bytesExpected == -1) { + // done + break; + } + if (bytesExpected > BUFFER_SIZE) { + throw new IOException( + "Unexpected number of incoming bytes: " + bytesExpected); } - } - else if (type == NAME_ADDRESSABLE) { - byte[] jidBytes = new byte[JobID.SIZE]; - readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); - JobID jobID = JobID.fromByteArray(jidBytes); - String key = readKey(buf, inputStream); + readFully(inputStream, buf, 0, bytesExpected, "buffer"); + fos.write(buf, 0, bytesExpected); -
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125218256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -107,146 +133,268 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } + @Override + public void registerJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + @Override + public void releaseJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); + return; + } + + --ref.references; + if (ref.references == 0) { + ref.keepUntil = System.currentTimeMillis() + cleanupInterval; + } + } + } + + /** +* Returns local copy of the (job-unrelated) file for the BLOB with the given key. +* +* The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in +* the cache, the method will try to download it from this cache's BLOB server. +* +* @param key +* The key of the desired BLOB. +* +* @return file referring to the local storage location of the BLOB. +* +* @throws IOException +* Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + @Override + public File getFile(BlobKey key) throws IOException { + return getFileInternal(null, key); + } + /** -* Returns the URL for the BLOB with the given key. The method will first attempt to serve -* the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it -* from this cache's BLOB server. +* Returns local copy of the file for the BLOB with the given key. +* +* The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in +* the cache, the method will try to download it from this cache's BLOB server. * -* @param requiredBlob The key of the desired BLOB. -* @return URL referring to the local storage location of the BLOB. -* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +* @param jobId +* ID of the job this blob belongs to +* @param key +* The key of the desired BLOB. +* +* @return file referring to the local storage location of the BLOB. +* +* @throws IOException +* Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ - public URL getURL(final BlobKey requiredBlob) throws IOException { + @Override + public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException { + checkNotNull(jobId); + return getFileInternal(jobId, key); + } + + /** +* Returns local copy of the file for the BLOB with the given key. +* +* The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in +* the cache, the method will try to download it from this cache's BLOB server. +* +* @param jobId +* ID of the job this blob belongs to (or null if job-unrelated) +* @param requiredBlob +* The key of the desired BLOB. +* +* @return file referring to the local storage location of the BLOB. +* +* @throws IOException +* Thrown if an I/O error
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125219915 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -269,166 +276,128 @@ else if (response != RETURN_OKAY) { // /** -* Uploads the data of the given byte array to the BLOB server in a content-addressable manner. +* Uploads the data of the given byte array for the given job to the BLOB server. * +* @param jobId +* the ID of the job the BLOB belongs to (or null if job-unrelated) * @param value -*the buffer to upload -* @return the computed BLOB key identifying the BLOB on the server -* @throws IOException -* thrown if an I/O error occurs while uploading the data to the BLOB server -*/ - public BlobKey put(byte[] value) throws IOException { - return put(value, 0, value.length); - } - - /** -* Uploads data from the given byte array to the BLOB server in a content-addressable manner. +* the buffer to upload * -* @param value -*the buffer to upload data from -* @param offset -*the read offset within the buffer -* @param len -*the number of bytes to upload from the buffer * @return the computed BLOB key identifying the BLOB on the server -* @throws IOException -* thrown if an I/O error occurs while uploading the data to the BLOB server -*/ - public BlobKey put(byte[] value, int offset, int len) throws IOException { - return putBuffer(null, null, value, offset, len); - } - - /** -* Uploads the data of the given byte array to the BLOB server and stores it under the given job ID and key. * -* @param jobId -*the job ID to identify the uploaded data -* @param key -*the key to identify the uploaded data -* @param value -*the buffer to upload * @throws IOException -* thrown if an I/O error occurs while uploading the data to the BLOB server +* thrown if an I/O error occurs while uploading the data to the BLOB server */ - public void put(JobID jobId, String key, byte[] value) throws IOException { - put(jobId, key, value, 0, value.length); + @VisibleForTesting + public BlobKey put(@Nullable JobID jobId, byte[] value) throws IOException { + return put(jobId, value, 0, value.length); } /** -* Uploads data from the given byte array to the BLOB server and stores it under the given job ID and key. +* Uploads data from the given byte array for the given job to the BLOB server. * * @param jobId -*the job ID to identify the uploaded data -* @param key -*the key to identify the uploaded data +* the ID of the job the BLOB belongs to (or null if job-unrelated) * @param value -*the buffer to upload data from +* the buffer to upload data from * @param offset -*the read offset within the buffer +* the read offset within the buffer * @param len -*the number of bytes to upload from the buffer +* the number of bytes to upload from the buffer +* +* @return the computed BLOB key identifying the BLOB on the server +* * @throws IOException -* thrown if an I/O error occurs while uploading the data to the BLOB server +* thrown if an I/O error occurs while uploading the data to the BLOB server */ - public void put(JobID jobId, String key, byte[] value, int offset, int len) throws IOException { - if (key.length() > MAX_KEY_LENGTH) { - throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH); - } - - putBuffer(jobId, key, value, offset, len); + @VisibleForTesting + public BlobKey put(@Nullable JobID jobId, byte[] value, int offset, int len) throws IOException { + return putBuffer(jobId, value, offset, len); } /** -* Uploads data from the given input stream to the BLOB server and stores it under the given job ID and key. +* Uploads the (job-unrelated) data from the given input stream to the BLOB server. * -* @param jobId -*the job ID to identify the uploaded data -* @param key
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125217930 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -107,146 +133,268 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } + @Override + public void registerJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + @Override + public void releaseJob(JobID jobId) { + synchronized (lockObject) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); + return; + } + + --ref.references; + if (ref.references == 0) { + ref.keepUntil = System.currentTimeMillis() + cleanupInterval; + } + } + } + + /** +* Returns local copy of the (job-unrelated) file for the BLOB with the given key. +* +* The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in +* the cache, the method will try to download it from this cache's BLOB server. +* +* @param key +* The key of the desired BLOB. +* +* @return file referring to the local storage location of the BLOB. +* +* @throws IOException +* Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +*/ + @Override + public File getFile(BlobKey key) throws IOException { + return getFileInternal(null, key); + } + /** -* Returns the URL for the BLOB with the given key. The method will first attempt to serve -* the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it -* from this cache's BLOB server. +* Returns local copy of the file for the BLOB with the given key. +* +* The method will first attempt to serve the BLOB from its local cache. If the BLOB is not in +* the cache, the method will try to download it from this cache's BLOB server. * -* @param requiredBlob The key of the desired BLOB. -* @return URL referring to the local storage location of the BLOB. -* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. +* @param jobId +* ID of the job this blob belongs to +* @param key +* The key of the desired BLOB. +* +* @return file referring to the local storage location of the BLOB. +* +* @throws IOException +* Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ - public URL getURL(final BlobKey requiredBlob) throws IOException { + @Override + public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException { --- End diff -- I think so far the convention is that fields without an annotation are considered `@Nonnull` and only fields which are annotated with `@Nullable` can be `null`. Otherwise `key` should also be marked as `@Nonnull`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125219368 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -255,8 +403,39 @@ public int getPort() { return serverAddress.getPort(); } + /** +* Cleans up BLOBs which are not referenced anymore. +*/ + @Override + public void run() { + synchronized (lockObject) { + Iterator> entryIter = jobRefCounters.entrySet().iterator(); + + while (entryIter.hasNext()) { + Map.Entry entry = entryIter.next(); + RefCount ref = entry.getValue(); + + if (ref.references <= 0 && System.currentTimeMillis() >= ref.keepUntil) { + JobID jobId = entry.getKey(); + + final File localFile = + new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId)); + try { + FileUtils.deleteDirectory(localFile); + // let's only remove this directory from cleanup if the cleanup was successful + entryIter.remove(); + } catch (Throwable t) { + LOG.warn("Failed to locally delete job directory {}", localFile.getAbsolutePath()); --- End diff -- Swallowing of exception --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125249421 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -162,105 +164,116 @@ static File initStorageDirectory(String storageDirectory) throws } /** -* Returns the BLOB service's directory for incoming files. The directory is created if it did -* not exist so far. +* Returns the BLOB service's directory for incoming (job-unrelated) files. The directory is +* created if it does not exist yet. +* +* @param storageDir +* storage directory used be the BLOB service * -* @return the BLOB server's directory for incoming files +* @return the BLOB service's directory for incoming files */ static File getIncomingDirectory(File storageDir) { final File incomingDir = new File(storageDir, "incoming"); - if (!incomingDir.mkdirs() && !incomingDir.exists()) { - throw new RuntimeException("Cannot create directory for incoming files " + incomingDir.getAbsolutePath()); - } + mkdirTolerateExisting(incomingDir, "incoming"); return incomingDir; } /** -* Returns the BLOB service's directory for cached files. The directory is created if it did -* not exist so far. +* Makes sure a given directory exists by creating it if necessary. * -* @return the BLOB server's directory for cached files +* @param dir +* directory to create +* @param dirType +* the type of the directory (included in error message if something fails) */ - private static File getCacheDirectory(File storageDir) { - final File cacheDirectory = new File(storageDir, "cache"); - - if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) { - throw new RuntimeException("Could not create cache directory '" + cacheDirectory.getAbsolutePath() + "'."); + private static void mkdirTolerateExisting(final File dir, final String dirType) { + // note: thread-safe create should try to mkdir first and then ignore the case that the + // directory already existed + if (!dir.mkdirs() && !dir.exists()) { + throw new RuntimeException( + "Cannot create " + dirType + " directory '" + dir.getAbsolutePath() + "'."); } - - return cacheDirectory; } /** * Returns the (designated) physical storage location of the BLOB with the given key. * +* @param storageDir +* storage directory used be the BLOB service * @param key -*the key identifying the BLOB +* the key identifying the BLOB +* @param jobId +* ID of the job for the incoming files (or null if job-unrelated) +* * @return the (designated) physical storage location of the BLOB */ - static File getStorageLocation(File storageDir, BlobKey key) { - return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX + key.toString()); - } + static File getStorageLocation( + @Nonnull File storageDir, @Nullable JobID jobId, @Nonnull BlobKey key) { + File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key)); - /** -* Returns the (designated) physical storage location of the BLOB with the given job ID and key. -* -* @param jobID -*the ID of the job the BLOB belongs to -* @param key -*the key of the BLOB -* @return the (designated) physical storage location of the BLOB with the given job ID and key -*/ - static File getStorageLocation(File storageDir, JobID jobID, String key) { - return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key)); + mkdirTolerateExisting(file.getParentFile(), "cache"); --- End diff -- Why are we creating a `cache` directory here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125249786 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobCacheLibraryManager.java --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.BlobCache; + +import javax.annotation.Nonnull; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Specialisation of {@link BlobLibraryCacheManager} that works with a {@link BlobCache}. + */ +public class BlobCacheLibraryManager extends BlobLibraryCacheManager { --- End diff -- Shall we give it a more distinct name? It might be a little bit confusing if a sub class is a permutation of the super class's name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125275855 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.FailingBlockingInvokable; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.net.InetSocketAddress; + +import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed + * after job termination. + */ +public class JobManagerCleanupITCase { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Test cleanup for a job that finishes ordinarily. +*/ + @Test + public void testBlobServerCleanupFinishedJob() throws IOException { + testBlobServerCleanup(ExecutionState.FINISHED); + } + + /** +* Test cleanup for a job which is cancelled after submission. +*/ + @Test + public void testBlobServerCleanupCancelledJob() throws IOException { + testBlobServerCleanup(ExecutionState.CANCELED); + } + + /** +* Test cleanup for a job that fails (first a task fails, then the job recovers, then the whole +* job fails due to a limited restart policy). +*/ + @Test + public void testBlobServerCleanupFailedJob() throws IOException { + testBlobServerCleanup(ExecutionState.FAILED); + } + + private void testBlobServerCleanup(final ExecutionState finalState) throws IOException { + final int num_tasks = 2; + final File blobBaseDir = tmpFolder.newFolder(); + + new JavaTestKit(system) {{ + new Within(duration("15 seconds")) { +
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r125250794 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobServerLibraryManager.java --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.BlobServer; + +import javax.annotation.Nonnull; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Specialisation of {@link BlobLibraryCacheManager} that works with a {@link BlobServer}. + */ +public final class BlobServerLibraryManager extends BlobLibraryCacheManager { --- End diff -- Can't we use the same BlobLibraryCacheManager implementation for the `BlobServerLibraryManager` and the `BlobCacheLibraryManager` which works on a `BlobService`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4238 [FLINK-7057][blob] move BLOB ref-counting from LibraryCacheManager to BlobCache Currently, the `LibraryCacheManager` is doing some ref-counting for JAR files managed by it. Instead, we want the `BlobCache` to do that itself for **all** job-related BLOBs. Also, we do not want to operate on a per-BlobKey level but rather per job. Job-unrelated BLOBs should be cleaned manually as done for the Web-UI logs. A future API change will reflect the different use cases in a better way. For now, we need to also adapt the cleanup appropriately. On the `BlobServer`, the JAR files should remain locally as well as in the HA store until the job enters a final state. Then they can be deleted. With this intermediate state, job-unrelated BLOBs will remain in the file system until deleted manually. This is the same as the previous API use when working with a `BlobService` directly instead of going through the `LibraryCacheManager`. The aforementioned API extension will include TTL fields for those BLOBs in order to have a proper cleanup, too. This PR is based upon #4237 in a series to implement FLINK-6916. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6916-7057 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4238.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4238 commit d54a316cfffd8243980df561fd4fcbd99934a40b Author: Nico KruberDate: 2016-12-20T15:49:57Z [FLINK-6008][docs] minor improvements in the BlobService docs commit b215515fa14d3f6af218e86b67bc2c27ae9d4f4f Author: Nico Kruber Date: 2016-12-20T17:27:13Z [FLINK-6008] refactor BlobCache#getURL() for cleaner code commit bbcde52b3105fcf379c852b568f3893cc6052ce6 Author: Nico Kruber Date: 2016-12-21T15:23:29Z [FLINK-6008] do not fail the BlobServer if delete fails also extend the delete tests and remove one code duplication commit dda1a12e40027724efb0e50005e5b57058a220f0 Author: Nico Kruber Date: 2017-01-06T17:42:58Z [FLINK-6008][docs] update some config options to the new, non-deprecated ones commit e12c2348b237207a50649d515a0fbbd19f92e6a0 Author: Nico Kruber Date: 2017-03-09T17:14:02Z [FLINK-6008] use Preconditions.checkArgument in BlobClient commit 24060e01332c6df9fd01f1dc5f321c3fda9301c1 Author: Nico Kruber Date: 2017-03-17T15:21:40Z [FLINK-6008] fix concurrent job directory creation also add according unit tests commit 2e0d16ab8bf8a48a2d028602a3a7693fc4b76039 Author: Nico Kruber Date: 2017-06-14T16:01:47Z [FLINK-6008] do not guard a delete() call with a check for existence commit 7ba911d7ecb4861261dff8509996be0bd64d6d27 Author: Nico Kruber Date: 2017-04-18T14:37:37Z [FLINK-6008] some comments about BlobLibraryCacheManager cleanup commit d3f50d595f85356ae6ed0a85e1f8b8e8ac630bde Author: Nico Kruber Date: 2017-04-19T13:39:03Z [hotfix] minor typos commit 79b6ce35a9e246b35415a388295f9ee2fc19a82e Author: Nico Kruber Date: 2017-04-19T14:10:16Z [FLINK-6008] further cleanup tests for BlobLibraryCacheManager commit 23fb6ecd6c43c86d762503339c67953290236dca Author: Nico Kruber Date: 2017-06-30T14:03:16Z [FLINK-6008] address PR comments commit 794764ceeed6b9bbbac08662f5754b218ff86c9c Author: Nico Kruber Date: 2017-06-16T08:51:04Z [FLINK-7052][blob] remove (unused) NAME_ADDRESSABLE mode commit 774bafa85f242110a2ce7907c1150f8c62d73b3f Author: Nico Kruber Date: 2017-06-21T15:05:57Z [FLINK-7052][blob] remove further unused code due to the NAME_ADDRESSABLE removal commit 4da3b3f6269e43bf1c66621099528824cad9373f Author: Nico Kruber Date: 2017-06-22T15:31:17Z [FLINK-7053][blob] remove code duplication in BlobClientSslTest This lets BlobClientSslTest extend BlobClientTest as most of its implementation came from there and was simply copied. commit aa9cdc820f9ca1a38a19708bf45a2099e42eaf48 Author: Nico Kruber Date: 2017-06-23T09:40:34Z [FLINK-7053][blob] verify some of the buffers returned by GET commit c9b693a46053b55b3939ff471184796f12d36a72 Author: Nico Kruber Date: 2017-06-23T10:04:10Z [FLINK-7053][blob] use TemporaryFolder for local BLOB dir in unit tests This replaces the use of some temporary directory where it is not guaranteed that it will