[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-20 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r134142721
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -108,11 +139,63 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
/**
+* Registers use of job-related BLOBs.
+* 
+* Using any other method to access BLOBs, e.g. {@link #getFile}, is 
only valid within calls
+* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #releaseJob(JobID)
+*/
+   public void registerJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
--- End diff --

Should keepUntil be modified (in case the code at line 193 runs) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-20 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r134142624
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -108,11 +139,63 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
/**
+* Registers use of job-related BLOBs.
+* 
+* Using any other method to access BLOBs, e.g. {@link #getFile}, is 
only valid within calls
+* to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #releaseJob(JobID)
+*/
+   public void registerJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
+   }
+   }
+
+   /**
+* Unregisters use of job-related BLOBs and allow them to be released.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+*
+* @see #registerJob(JobID)
+*/
+   public void releaseJob(JobID jobId) {
+   synchronized (jobRefCounters) {
+   RefCount ref = jobRefCounters.get(jobId);
+
+   if (ref == null) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
--- End diff --

Including jobId would help troubleshooting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4238


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133688568
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is 
called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+   private static boolean blocking = true;
--- End diff --

oh, sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133688433
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
+
+   @Rule
+   public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Specifies which test case to run in {@link 
#testBlobServerCleanup(TestCase)}.
+*/
+   private enum TestCase {
+   JOB_FINISHES_SUCESSFULLY,
+   JOB_IS_CANCELLED,
+   JOB_FAILS,
+   JOB_SUBMISSION_FAILS
+   }
+
+   /**
+* Test cleanup for a job that finishes ordinarily.
+*/
+   @Test
+   public void testBlobServerCleanupFinishedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+   }
+
+   /**
+* Test cleanup for a job which is cancelled after submission.
+*/
+   @Test
+   public void testBlobServerCleanupCancelledJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+   }
+
+   /**
+* Test cleanup for a job that fails (first a task fails, then the job 
recovers, then the whole
+* job fails due to a limited restart policy).
+*/
+   @Test
+   public void testBlobServerCleanupFailedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FAILS);
+   }
+
+   /**
+

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133687826
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing the second time, the job is up for 
deferred cleanup
+   cache.releaseJob(jobId);
+
+   // because we cannot 

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133685671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -148,7 +149,10 @@
/** Service to contend for and retrieve the leadership of JM and RM */
private final HighAvailabilityServices highAvailabilityServices;
 
-   /** Blob cache manager used across jobs */
+   /** Blob server used across jobs */
+   private final BlobServer blobServer;
--- End diff --

Actually, I'd rather let the services involved in the life cycle of the 
BLOBs known what they are dealing with than adding empty `register/releaseJob` 
methods to the `BlobService`/`BlobServer`. Once we move the `BlobServer` out of 
the `JobMaster`, we can  change the class of the parameters and also remove the 
`register/releaseJob` calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133673650
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
+
+   @Rule
+   public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Specifies which test case to run in {@link 
#testBlobServerCleanup(TestCase)}.
+*/
+   private enum TestCase {
+   JOB_FINISHES_SUCESSFULLY,
+   JOB_IS_CANCELLED,
+   JOB_FAILS,
+   JOB_SUBMISSION_FAILS
+   }
+
+   /**
+* Test cleanup for a job that finishes ordinarily.
+*/
+   @Test
+   public void testBlobServerCleanupFinishedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+   }
+
+   /**
+* Test cleanup for a job which is cancelled after submission.
+*/
+   @Test
+   public void testBlobServerCleanupCancelledJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+   }
+
+   /**
+* Test cleanup for a job that fails (first a task fails, then the job 
recovers, then the whole
+* job fails due to a limited restart policy).
+*/
+   @Test
+   public void testBlobServerCleanupFailedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FAILS);
+   }
+
+   /**
  

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133668912
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
--- End diff --

Let's extend from the `TestLogger` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133670563
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing the second time, the job is up for 
deferred cleanup
+   cache.releaseJob(jobId);
+
+   // because we 

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133669690
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
--- End diff --

Maybe we could close the client in the finally block as well if something 
goes wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133670196
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing the second time, the job is up for 
deferred cleanup
+   cache.releaseJob(jobId);
+
+   // because we 

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133674703
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is 
called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+   private static boolean blocking = true;
--- End diff --

I think we should make `blocking` `volatile`. Otherwise we might miss 
changes and end up in a deadlock. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133671238
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 ---
@@ -45,6 +42,18 @@
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link BlobLibraryCacheManager}.
+ */
 public class BlobLibraryCacheManagerTest {
--- End diff --

Let's extend from `TestLogger` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133673260
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
--- End diff --

`TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133494977
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java 
---
@@ -73,4 +73,9 @@
public static final ConfigOption SSL_ENABLED =
key("blob.service.ssl.enabled")
.defaultValue(true);
+
+   public static final ConfigOption CLEANUP_INTERVAL =
+   key("blob.service.cleanup.interval")
--- End diff --

We should document this configuration parameter in the `config.md`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133500735
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
 ---
@@ -63,21 +67,22 @@
private final PartitionProducerStateChecker partitionStateChecker;
 
public JobManagerConnection(
-   JobID jobID,
-   ResourceID resourceID,
-   JobMasterGateway jobMasterGateway,
-   UUID leaderId,
-   TaskManagerActions taskManagerActions,
-   CheckpointResponder checkpointResponder,
-   LibraryCacheManager libraryCacheManager,
-   ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
-   PartitionProducerStateChecker partitionStateChecker) {
+   JobID jobID,
+   ResourceID resourceID,
+   JobMasterGateway jobMasterGateway,
+   UUID leaderId,
+   TaskManagerActions taskManagerActions,
+   CheckpointResponder checkpointResponder,
+   BlobCache blobCache, LibraryCacheManager 
libraryCacheManager,
--- End diff --

Formatting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133497146
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -341,8 +424,39 @@ public int getPort() {
return serverAddress.getPort();
}
 
+   /**
+* Cleans up BLOBs which are not referenced anymore.
+*/
+   @Override
+   public void run() {
+   synchronized (jobRefCounters) {
+   Iterator> entryIter = 
jobRefCounters.entrySet().iterator();
+
+   while (entryIter.hasNext()) {
+   Map.Entry entry = 
entryIter.next();
+   RefCount ref = entry.getValue();
+
+   if (ref.references <= 0 && ref.keepUntil > 0 && 
System.currentTimeMillis() >= ref.keepUntil) {
--- End diff --

We could store the current time so that we don't retrieve it for every 
entry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133503488
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -148,7 +149,10 @@
/** Service to contend for and retrieve the leadership of JM and RM */
private final HighAvailabilityServices highAvailabilityServices;
 
-   /** Blob cache manager used across jobs */
+   /** Blob server used across jobs */
+   private final BlobServer blobServer;
--- End diff --

I think the `JobMaster` should not depend on the `BlobServer` because the 
`BlobServer` might run on a different node (along side to the dispatcher, for 
example). Can we pass in the `BlobService` which can either be a `BlobServer` 
or a `BlobCache`? I think we have to move the `register/releaseJob` methods to 
the `BlobService` for that. Additionally, the `BlobServer` should simply do 
nothing when these methods are called. That way, we can also pass in a 
`BlobService` to the `Task` and the `TaskExecutor/TaskManager`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-06 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125827511
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
clientSocket.close();
}
finally {
-   if (fos != null) {
-   try {
-   fos.close();
-   } catch (Throwable t) {
-   LOG.warn("Cannot close stream to BLOB 
staging file", t);
-   }
-   }
if (incomingFile != null) {
-   if (!incomingFile.delete()) {
+   if (!incomingFile.delete() && 
incomingFile.exists()) {
LOG.warn("Cannot delete BLOB server 
staging file " + incomingFile.getAbsolutePath());
}
}
}
}
 
/**
-* Handles an incoming DELETE request from a BLOB client.
-* 
-* @param inputStream The input stream to read the request from.
-* @param outputStream The output stream to write the response to.
-* @throws java.io.IOException Thrown if an I/O error occurs while 
reading the request data from the input stream.
+* Reads a full file from inputStream into 
incomingFile returning its checksum.
+*
+* @param inputStream
+*  stream to read from
+* @param incomingFile
+*  file to write to
+* @param buf
+*  An auxiliary buffer for data 
serialization/deserialization
+*
+* @return the received file's content hash as a BLOB key
+*
+* @throws IOException
+*  thrown if an I/O error occurs while reading/writing 
data from/to the respective streams
 */
-   private void delete(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
+   private static BlobKey readFileFully(
+   final InputStream inputStream, final File incomingFile, 
final byte[] buf)
+   throws IOException {
+   MessageDigest md = BlobUtils.createMessageDigest();
+   FileOutputStream fos = new FileOutputStream(incomingFile);
 
try {
-   int type = inputStream.read();
-   if (type < 0) {
-   throw new EOFException("Premature end of DELETE 
request");
-   }
-
-   if (type == CONTENT_ADDRESSABLE) {
-   BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-   File blobFile = 
blobServer.getStorageLocation(key);
-
-   writeLock.lock();
-
-   try {
-   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
-   // removing the remote file in case of 
a concurrent put operation
-   if (blobFile.exists() && 
!blobFile.delete()) {
-   throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
-   }
-
-   blobStore.delete(key);
-   } finally {
-   writeLock.unlock();
+   while (true) {
+   final int bytesExpected = 
readLength(inputStream);
+   if (bytesExpected == -1) {
+   // done
+   break;
+   }
+   if (bytesExpected > BUFFER_SIZE) {
+   throw new IOException(
+   "Unexpected number of incoming 
bytes: " + bytesExpected);
}
-   }
-   else if (type == NAME_ADDRESSABLE) {
-   byte[] jidBytes = new byte[JobID.SIZE];
-   readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
-   JobID jobID = JobID.fromByteArray(jidBytes);
 
-   String key = readKey(buf, inputStream);
+   readFully(inputStream, buf, 0, bytesExpected, 
"buffer");
+   fos.write(buf, 0, bytesExpected);
 
-   

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125577195
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
clientSocket.close();
}
finally {
-   if (fos != null) {
-   try {
-   fos.close();
-   } catch (Throwable t) {
-   LOG.warn("Cannot close stream to BLOB 
staging file", t);
-   }
-   }
if (incomingFile != null) {
-   if (!incomingFile.delete()) {
+   if (!incomingFile.delete() && 
incomingFile.exists()) {
LOG.warn("Cannot delete BLOB server 
staging file " + incomingFile.getAbsolutePath());
}
}
}
}
 
/**
-* Handles an incoming DELETE request from a BLOB client.
-* 
-* @param inputStream The input stream to read the request from.
-* @param outputStream The output stream to write the response to.
-* @throws java.io.IOException Thrown if an I/O error occurs while 
reading the request data from the input stream.
+* Reads a full file from inputStream into 
incomingFile returning its checksum.
+*
+* @param inputStream
+*  stream to read from
+* @param incomingFile
+*  file to write to
+* @param buf
+*  An auxiliary buffer for data 
serialization/deserialization
+*
+* @return the received file's content hash as a BLOB key
+*
+* @throws IOException
+*  thrown if an I/O error occurs while reading/writing 
data from/to the respective streams
 */
-   private void delete(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
+   private static BlobKey readFileFully(
+   final InputStream inputStream, final File incomingFile, 
final byte[] buf)
+   throws IOException {
+   MessageDigest md = BlobUtils.createMessageDigest();
+   FileOutputStream fos = new FileOutputStream(incomingFile);
 
try {
-   int type = inputStream.read();
-   if (type < 0) {
-   throw new EOFException("Premature end of DELETE 
request");
-   }
-
-   if (type == CONTENT_ADDRESSABLE) {
-   BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-   File blobFile = 
blobServer.getStorageLocation(key);
-
-   writeLock.lock();
-
-   try {
-   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
-   // removing the remote file in case of 
a concurrent put operation
-   if (blobFile.exists() && 
!blobFile.delete()) {
-   throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
-   }
-
-   blobStore.delete(key);
-   } finally {
-   writeLock.unlock();
+   while (true) {
+   final int bytesExpected = 
readLength(inputStream);
+   if (bytesExpected == -1) {
+   // done
+   break;
+   }
+   if (bytesExpected > BUFFER_SIZE) {
+   throw new IOException(
+   "Unexpected number of incoming 
bytes: " + bytesExpected);
}
-   }
-   else if (type == NAME_ADDRESSABLE) {
-   byte[] jidBytes = new byte[JobID.SIZE];
-   readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
-   JobID jobID = JobID.fromByteArray(jidBytes);
 
-   String key = readKey(buf, inputStream);
+   readFully(inputStream, buf, 0, bytesExpected, 
"buffer");
+   fos.write(buf, 0, bytesExpected);
 
-

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-04 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125430786
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
clientSocket.close();
}
finally {
-   if (fos != null) {
-   try {
-   fos.close();
-   } catch (Throwable t) {
-   LOG.warn("Cannot close stream to BLOB 
staging file", t);
-   }
-   }
if (incomingFile != null) {
-   if (!incomingFile.delete()) {
+   if (!incomingFile.delete() && 
incomingFile.exists()) {
LOG.warn("Cannot delete BLOB server 
staging file " + incomingFile.getAbsolutePath());
}
}
}
}
 
/**
-* Handles an incoming DELETE request from a BLOB client.
-* 
-* @param inputStream The input stream to read the request from.
-* @param outputStream The output stream to write the response to.
-* @throws java.io.IOException Thrown if an I/O error occurs while 
reading the request data from the input stream.
+* Reads a full file from inputStream into 
incomingFile returning its checksum.
+*
+* @param inputStream
+*  stream to read from
+* @param incomingFile
+*  file to write to
+* @param buf
+*  An auxiliary buffer for data 
serialization/deserialization
+*
+* @return the received file's content hash as a BLOB key
+*
+* @throws IOException
+*  thrown if an I/O error occurs while reading/writing 
data from/to the respective streams
 */
-   private void delete(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
+   private static BlobKey readFileFully(
+   final InputStream inputStream, final File incomingFile, 
final byte[] buf)
+   throws IOException {
+   MessageDigest md = BlobUtils.createMessageDigest();
+   FileOutputStream fos = new FileOutputStream(incomingFile);
 
try {
-   int type = inputStream.read();
-   if (type < 0) {
-   throw new EOFException("Premature end of DELETE 
request");
-   }
-
-   if (type == CONTENT_ADDRESSABLE) {
-   BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-   File blobFile = 
blobServer.getStorageLocation(key);
-
-   writeLock.lock();
-
-   try {
-   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
-   // removing the remote file in case of 
a concurrent put operation
-   if (blobFile.exists() && 
!blobFile.delete()) {
-   throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
-   }
-
-   blobStore.delete(key);
-   } finally {
-   writeLock.unlock();
+   while (true) {
+   final int bytesExpected = 
readLength(inputStream);
+   if (bytesExpected == -1) {
+   // done
+   break;
+   }
+   if (bytesExpected > BUFFER_SIZE) {
+   throw new IOException(
+   "Unexpected number of incoming 
bytes: " + bytesExpected);
}
-   }
-   else if (type == NAME_ADDRESSABLE) {
-   byte[] jidBytes = new byte[JobID.SIZE];
-   readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
-   JobID jobID = JobID.fromByteArray(jidBytes);
 
-   String key = readKey(buf, inputStream);
+   readFully(inputStream, buf, 0, bytesExpected, 
"buffer");
+   fos.write(buf, 0, bytesExpected);
 
-   

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-04 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125430028
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -162,105 +164,116 @@ static File initStorageDirectory(String 
storageDirectory) throws
}
 
/**
-* Returns the BLOB service's directory for incoming files. The 
directory is created if it did
-* not exist so far.
+* Returns the BLOB service's directory for incoming (job-unrelated) 
files. The directory is
+* created if it does not exist yet.
+*
+* @param storageDir
+*  storage directory used be the BLOB service
 *
-* @return the BLOB server's directory for incoming files
+* @return the BLOB service's directory for incoming files
 */
static File getIncomingDirectory(File storageDir) {
final File incomingDir = new File(storageDir, "incoming");
 
-   if (!incomingDir.mkdirs() && !incomingDir.exists()) {
-   throw new RuntimeException("Cannot create directory for 
incoming files " + incomingDir.getAbsolutePath());
-   }
+   mkdirTolerateExisting(incomingDir, "incoming");
 
return incomingDir;
}
 
/**
-* Returns the BLOB service's directory for cached files. The directory 
is created if it did
-* not exist so far.
+* Makes sure a given directory exists by creating it if necessary.
 *
-* @return the BLOB server's directory for cached files
+* @param dir
+*  directory to create
+* @param dirType
+*  the type of the directory (included in error message if 
something fails)
 */
-   private static File getCacheDirectory(File storageDir) {
-   final File cacheDirectory = new File(storageDir, "cache");
-
-   if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) {
-   throw new RuntimeException("Could not create cache 
directory '" + cacheDirectory.getAbsolutePath() + "'.");
+   private static void mkdirTolerateExisting(final File dir, final String 
dirType) {
+   // note: thread-safe create should try to mkdir first and then 
ignore the case that the
+   //   directory already existed
+   if (!dir.mkdirs() && !dir.exists()) {
+   throw new RuntimeException(
+   "Cannot create " + dirType + " directory '" + 
dir.getAbsolutePath() + "'.");
}
-
-   return cacheDirectory;
}
 
/**
 * Returns the (designated) physical storage location of the BLOB with 
the given key.
 *
+* @param storageDir
+*  storage directory used be the BLOB service
 * @param key
-*the key identifying the BLOB
+*  the key identifying the BLOB
+* @param jobId
+*  ID of the job for the incoming files (or null 
if job-unrelated)
+*
 * @return the (designated) physical storage location of the BLOB
 */
-   static File getStorageLocation(File storageDir, BlobKey key) {
-   return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX 
+ key.toString());
-   }
+   static File getStorageLocation(
+   @Nonnull File storageDir, @Nullable JobID jobId, 
@Nonnull BlobKey key) {
+   File file = new 
File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
 
-   /**
-* Returns the (designated) physical storage location of the BLOB with 
the given job ID and key.
-*
-* @param jobID
-*the ID of the job the BLOB belongs to
-* @param key
-*the key of the BLOB
-* @return the (designated) physical storage location of the BLOB with 
the given job ID and key
-*/
-   static File getStorageLocation(File storageDir, JobID jobID, String 
key) {
-   return new File(getJobDirectory(storageDir, jobID), 
BLOB_FILE_PREFIX + encodeKey(key));
+   mkdirTolerateExisting(file.getParentFile(), "cache");
--- End diff --

"cache" is actually not the directory name but part of an error message (if 
there is an error) - thinking a bit more about this though, I can remove that 
parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-04 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125406639
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobServerLibraryManager.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobServer;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Specialisation of {@link BlobLibraryCacheManager} that works with a 
{@link BlobServer}.
+ */
+public final class BlobServerLibraryManager extends 
BlobLibraryCacheManager {
--- End diff --

Oh yes, the API is not too clean here - let my give it another shot: I 
wanted to get rid of the `BlobService#registerJob()` and 
`BlobService#releaseJob()` calls and only want to make them available in the 
`BlobCache`. Let's extract these two calls from the `BlobLibraryCacheManager` 
since they should be called with the `FallbackLibraryCacheManager` as well...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-04 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125404358
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -107,146 +133,268 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = blobClientConfig.getLong(
+   ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+   
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
+   @Override
+   public void registerJob(JobID jobId) {
+   synchronized (lockObject) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
+   }
+   }
+
+   @Override
+   public void releaseJob(JobID jobId) {
+   synchronized (lockObject) {
+   RefCount ref = jobRefCounters.get(jobId);
+
+   if (ref == null) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
+   return;
+   }
+
+   --ref.references;
+   if (ref.references == 0) {
+   ref.keepUntil = System.currentTimeMillis() + 
cleanupInterval;
+   }
+   }
+   }
+
+   /**
+* Returns local copy of the (job-unrelated) file for the BLOB with the 
given key.
+* 
+* The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
+* the cache, the method will try to download it from this cache's BLOB 
server.
+*
+* @param key
+*  The key of the desired BLOB.
+*
+* @return file referring to the local storage location of the BLOB.
+*
+* @throws IOException
+*  Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
+*/
+   @Override
+   public File getFile(BlobKey key) throws IOException {
+   return getFileInternal(null, key);
+   }
+
/**
-* Returns the URL for the BLOB with the given key. The method will 
first attempt to serve
-* the BLOB from its local cache. If the BLOB is not in the cache, the 
method will try to download it
-* from this cache's BLOB server.
+* Returns local copy of the file for the BLOB with the given key.
+* 
+* The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
+* the cache, the method will try to download it from this cache's BLOB 
server.
 *
-* @param requiredBlob The key of the desired BLOB.
-* @return URL referring to the local storage location of the BLOB.
-* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+* @param jobId
+*  ID of the job this blob belongs to
+* @param key
+*  The key of the desired BLOB.
+*
+* @return file referring to the local storage location of the BLOB.
+*
+* @throws IOException
+*  Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
 */
-   public URL getURL(final BlobKey requiredBlob) throws IOException {
+   @Override
+   public File getFile(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
+   checkNotNull(jobId);
+   return getFileInternal(jobId, key);
+   }
+
+   /**
+* Returns local copy of the file for the BLOB with the given key.
+* 
+* The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
+* the cache, the method will try to download it from this cache's BLOB 
server.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param requiredBlob
+*  The key of the desired BLOB.
+*
+* @return file referring to the local storage location of the BLOB.
+*
+* @throws IOException
+*  Thrown if an I/O error occurs 

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125247578
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 ---
@@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
clientSocket.close();
}
finally {
-   if (fos != null) {
-   try {
-   fos.close();
-   } catch (Throwable t) {
-   LOG.warn("Cannot close stream to BLOB 
staging file", t);
-   }
-   }
if (incomingFile != null) {
-   if (!incomingFile.delete()) {
+   if (!incomingFile.delete() && 
incomingFile.exists()) {
LOG.warn("Cannot delete BLOB server 
staging file " + incomingFile.getAbsolutePath());
}
}
}
}
 
/**
-* Handles an incoming DELETE request from a BLOB client.
-* 
-* @param inputStream The input stream to read the request from.
-* @param outputStream The output stream to write the response to.
-* @throws java.io.IOException Thrown if an I/O error occurs while 
reading the request data from the input stream.
+* Reads a full file from inputStream into 
incomingFile returning its checksum.
+*
+* @param inputStream
+*  stream to read from
+* @param incomingFile
+*  file to write to
+* @param buf
+*  An auxiliary buffer for data 
serialization/deserialization
+*
+* @return the received file's content hash as a BLOB key
+*
+* @throws IOException
+*  thrown if an I/O error occurs while reading/writing 
data from/to the respective streams
 */
-   private void delete(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
+   private static BlobKey readFileFully(
+   final InputStream inputStream, final File incomingFile, 
final byte[] buf)
+   throws IOException {
+   MessageDigest md = BlobUtils.createMessageDigest();
+   FileOutputStream fos = new FileOutputStream(incomingFile);
 
try {
-   int type = inputStream.read();
-   if (type < 0) {
-   throw new EOFException("Premature end of DELETE 
request");
-   }
-
-   if (type == CONTENT_ADDRESSABLE) {
-   BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-   File blobFile = 
blobServer.getStorageLocation(key);
-
-   writeLock.lock();
-
-   try {
-   // we should make the local and remote 
file deletion atomic, otherwise we might risk not
-   // removing the remote file in case of 
a concurrent put operation
-   if (blobFile.exists() && 
!blobFile.delete()) {
-   throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
-   }
-
-   blobStore.delete(key);
-   } finally {
-   writeLock.unlock();
+   while (true) {
+   final int bytesExpected = 
readLength(inputStream);
+   if (bytesExpected == -1) {
+   // done
+   break;
+   }
+   if (bytesExpected > BUFFER_SIZE) {
+   throw new IOException(
+   "Unexpected number of incoming 
bytes: " + bytesExpected);
}
-   }
-   else if (type == NAME_ADDRESSABLE) {
-   byte[] jidBytes = new byte[JobID.SIZE];
-   readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
-   JobID jobID = JobID.fromByteArray(jidBytes);
 
-   String key = readKey(buf, inputStream);
+   readFully(inputStream, buf, 0, bytesExpected, 
"buffer");
+   fos.write(buf, 0, bytesExpected);
 
-

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125218256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -107,146 +133,268 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = blobClientConfig.getLong(
+   ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+   
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
+   @Override
+   public void registerJob(JobID jobId) {
+   synchronized (lockObject) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
+   }
+   }
+
+   @Override
+   public void releaseJob(JobID jobId) {
+   synchronized (lockObject) {
+   RefCount ref = jobRefCounters.get(jobId);
+
+   if (ref == null) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
+   return;
+   }
+
+   --ref.references;
+   if (ref.references == 0) {
+   ref.keepUntil = System.currentTimeMillis() + 
cleanupInterval;
+   }
+   }
+   }
+
+   /**
+* Returns local copy of the (job-unrelated) file for the BLOB with the 
given key.
+* 
+* The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
+* the cache, the method will try to download it from this cache's BLOB 
server.
+*
+* @param key
+*  The key of the desired BLOB.
+*
+* @return file referring to the local storage location of the BLOB.
+*
+* @throws IOException
+*  Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
+*/
+   @Override
+   public File getFile(BlobKey key) throws IOException {
+   return getFileInternal(null, key);
+   }
+
/**
-* Returns the URL for the BLOB with the given key. The method will 
first attempt to serve
-* the BLOB from its local cache. If the BLOB is not in the cache, the 
method will try to download it
-* from this cache's BLOB server.
+* Returns local copy of the file for the BLOB with the given key.
+* 
+* The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
+* the cache, the method will try to download it from this cache's BLOB 
server.
 *
-* @param requiredBlob The key of the desired BLOB.
-* @return URL referring to the local storage location of the BLOB.
-* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+* @param jobId
+*  ID of the job this blob belongs to
+* @param key
+*  The key of the desired BLOB.
+*
+* @return file referring to the local storage location of the BLOB.
+*
+* @throws IOException
+*  Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
 */
-   public URL getURL(final BlobKey requiredBlob) throws IOException {
+   @Override
+   public File getFile(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
+   checkNotNull(jobId);
+   return getFileInternal(jobId, key);
+   }
+
+   /**
+* Returns local copy of the file for the BLOB with the given key.
+* 
+* The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
+* the cache, the method will try to download it from this cache's BLOB 
server.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param requiredBlob
+*  The key of the desired BLOB.
+*
+* @return file referring to the local storage location of the BLOB.
+*
+* @throws IOException
+*  Thrown if an I/O error 

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125219915
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -269,166 +276,128 @@ else if (response != RETURN_OKAY) {
// 

 
/**
-* Uploads the data of the given byte array to the BLOB server in a 
content-addressable manner.
+* Uploads the data of the given byte array for the given job to the 
BLOB server.
 *
+* @param jobId
+*  the ID of the job the BLOB belongs to (or null 
if job-unrelated)
 * @param value
-*the buffer to upload
-* @return the computed BLOB key identifying the BLOB on the server
-* @throws IOException
-* thrown if an I/O error occurs while uploading the data to 
the BLOB server
-*/
-   public BlobKey put(byte[] value) throws IOException {
-   return put(value, 0, value.length);
-   }
-
-   /**
-* Uploads data from the given byte array to the BLOB server in a 
content-addressable manner.
+*  the buffer to upload
 *
-* @param value
-*the buffer to upload data from
-* @param offset
-*the read offset within the buffer
-* @param len
-*the number of bytes to upload from the buffer
 * @return the computed BLOB key identifying the BLOB on the server
-* @throws IOException
-* thrown if an I/O error occurs while uploading the data to 
the BLOB server
-*/
-   public BlobKey put(byte[] value, int offset, int len) throws 
IOException {
-   return putBuffer(null, null, value, offset, len);
-   }
-
-   /**
-* Uploads the data of the given byte array to the BLOB server and 
stores it under the given job ID and key.
 *
-* @param jobId
-*the job ID to identify the uploaded data
-* @param key
-*the key to identify the uploaded data
-* @param value
-*the buffer to upload
 * @throws IOException
-* thrown if an I/O error occurs while uploading the data to 
the BLOB server
+*  thrown if an I/O error occurs while uploading the data 
to the BLOB server
 */
-   public void put(JobID jobId, String key, byte[] value) throws 
IOException {
-   put(jobId, key, value, 0, value.length);
+   @VisibleForTesting
+   public BlobKey put(@Nullable JobID jobId, byte[] value) throws 
IOException {
+   return put(jobId, value, 0, value.length);
}
 
/**
-* Uploads data from the given byte array to the BLOB server and stores 
it under the given job ID and key.
+* Uploads data from the given byte array for the given job to the BLOB 
server.
 *
 * @param jobId
-*the job ID to identify the uploaded data
-* @param key
-*the key to identify the uploaded data
+*  the ID of the job the BLOB belongs to (or null 
if job-unrelated)
 * @param value
-*the buffer to upload data from
+*  the buffer to upload data from
 * @param offset
-*the read offset within the buffer
+*  the read offset within the buffer
 * @param len
-*the number of bytes to upload from the buffer
+*  the number of bytes to upload from the buffer
+*
+* @return the computed BLOB key identifying the BLOB on the server
+*
 * @throws IOException
-* thrown if an I/O error occurs while uploading the data to 
the BLOB server
+*  thrown if an I/O error occurs while uploading the data 
to the BLOB server
 */
-   public void put(JobID jobId, String key, byte[] value, int offset, int 
len) throws IOException {
-   if (key.length() > MAX_KEY_LENGTH) {
-   throw new IllegalArgumentException("Keys must not be 
longer than " + MAX_KEY_LENGTH);
-   }
-
-   putBuffer(jobId, key, value, offset, len);
+   @VisibleForTesting
+   public BlobKey put(@Nullable JobID jobId, byte[] value, int offset, int 
len) throws IOException {
+   return putBuffer(jobId, value, offset, len);
}
 
/**
-* Uploads data from the given input stream to the BLOB server and 
stores it under the given job ID and key.
+* Uploads the (job-unrelated) data from the given input stream to the 
BLOB server.
 *
-* @param jobId
-*the job ID to identify the uploaded data
-* @param key
  

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125217930
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -107,146 +133,268 @@ public BlobCache(
this.numFetchRetries = 0;
}
 
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   cleanupInterval = blobClientConfig.getLong(
+   ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+   
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+   this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
 
+   @Override
+   public void registerJob(JobID jobId) {
+   synchronized (lockObject) {
+   RefCount ref = jobRefCounters.get(jobId);
+   if (ref == null) {
+   ref = new RefCount();
+   jobRefCounters.put(jobId, ref);
+   }
+   ++ref.references;
+   }
+   }
+
+   @Override
+   public void releaseJob(JobID jobId) {
+   synchronized (lockObject) {
+   RefCount ref = jobRefCounters.get(jobId);
+
+   if (ref == null) {
+   LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
+   return;
+   }
+
+   --ref.references;
+   if (ref.references == 0) {
+   ref.keepUntil = System.currentTimeMillis() + 
cleanupInterval;
+   }
+   }
+   }
+
+   /**
+* Returns local copy of the (job-unrelated) file for the BLOB with the 
given key.
+* 
+* The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
+* the cache, the method will try to download it from this cache's BLOB 
server.
+*
+* @param key
+*  The key of the desired BLOB.
+*
+* @return file referring to the local storage location of the BLOB.
+*
+* @throws IOException
+*  Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
+*/
+   @Override
+   public File getFile(BlobKey key) throws IOException {
+   return getFileInternal(null, key);
+   }
+
/**
-* Returns the URL for the BLOB with the given key. The method will 
first attempt to serve
-* the BLOB from its local cache. If the BLOB is not in the cache, the 
method will try to download it
-* from this cache's BLOB server.
+* Returns local copy of the file for the BLOB with the given key.
+* 
+* The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
+* the cache, the method will try to download it from this cache's BLOB 
server.
 *
-* @param requiredBlob The key of the desired BLOB.
-* @return URL referring to the local storage location of the BLOB.
-* @throws IOException Thrown if an I/O error occurs while downloading 
the BLOBs from the BLOB server.
+* @param jobId
+*  ID of the job this blob belongs to
+* @param key
+*  The key of the desired BLOB.
+*
+* @return file referring to the local storage location of the BLOB.
+*
+* @throws IOException
+*  Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
 */
-   public URL getURL(final BlobKey requiredBlob) throws IOException {
+   @Override
+   public File getFile(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
--- End diff --

I think so far the convention is that fields without an annotation are 
considered `@Nonnull` and only fields which are annotated with `@Nullable` can 
be `null`. Otherwise `key` should also be marked as `@Nonnull`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125219368
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -255,8 +403,39 @@ public int getPort() {
return serverAddress.getPort();
}
 
+   /**
+* Cleans up BLOBs which are not referenced anymore.
+*/
+   @Override
+   public void run() {
+   synchronized (lockObject) {
+   Iterator> entryIter = 
jobRefCounters.entrySet().iterator();
+
+   while (entryIter.hasNext()) {
+   Map.Entry entry = 
entryIter.next();
+   RefCount ref = entry.getValue();
+
+   if (ref.references <= 0 && 
System.currentTimeMillis() >= ref.keepUntil) {
+   JobID jobId = entry.getKey();
+
+   final File localFile =
+   new 
File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));
+   try {
+   
FileUtils.deleteDirectory(localFile);
+   // let's only remove this 
directory from cleanup if the cleanup was successful
+   entryIter.remove();
+   } catch (Throwable t) {
+   LOG.warn("Failed to locally 
delete job directory {}", localFile.getAbsolutePath());
--- End diff --

Swallowing of exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125249421
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -162,105 +164,116 @@ static File initStorageDirectory(String 
storageDirectory) throws
}
 
/**
-* Returns the BLOB service's directory for incoming files. The 
directory is created if it did
-* not exist so far.
+* Returns the BLOB service's directory for incoming (job-unrelated) 
files. The directory is
+* created if it does not exist yet.
+*
+* @param storageDir
+*  storage directory used be the BLOB service
 *
-* @return the BLOB server's directory for incoming files
+* @return the BLOB service's directory for incoming files
 */
static File getIncomingDirectory(File storageDir) {
final File incomingDir = new File(storageDir, "incoming");
 
-   if (!incomingDir.mkdirs() && !incomingDir.exists()) {
-   throw new RuntimeException("Cannot create directory for 
incoming files " + incomingDir.getAbsolutePath());
-   }
+   mkdirTolerateExisting(incomingDir, "incoming");
 
return incomingDir;
}
 
/**
-* Returns the BLOB service's directory for cached files. The directory 
is created if it did
-* not exist so far.
+* Makes sure a given directory exists by creating it if necessary.
 *
-* @return the BLOB server's directory for cached files
+* @param dir
+*  directory to create
+* @param dirType
+*  the type of the directory (included in error message if 
something fails)
 */
-   private static File getCacheDirectory(File storageDir) {
-   final File cacheDirectory = new File(storageDir, "cache");
-
-   if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) {
-   throw new RuntimeException("Could not create cache 
directory '" + cacheDirectory.getAbsolutePath() + "'.");
+   private static void mkdirTolerateExisting(final File dir, final String 
dirType) {
+   // note: thread-safe create should try to mkdir first and then 
ignore the case that the
+   //   directory already existed
+   if (!dir.mkdirs() && !dir.exists()) {
+   throw new RuntimeException(
+   "Cannot create " + dirType + " directory '" + 
dir.getAbsolutePath() + "'.");
}
-
-   return cacheDirectory;
}
 
/**
 * Returns the (designated) physical storage location of the BLOB with 
the given key.
 *
+* @param storageDir
+*  storage directory used be the BLOB service
 * @param key
-*the key identifying the BLOB
+*  the key identifying the BLOB
+* @param jobId
+*  ID of the job for the incoming files (or null 
if job-unrelated)
+*
 * @return the (designated) physical storage location of the BLOB
 */
-   static File getStorageLocation(File storageDir, BlobKey key) {
-   return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX 
+ key.toString());
-   }
+   static File getStorageLocation(
+   @Nonnull File storageDir, @Nullable JobID jobId, 
@Nonnull BlobKey key) {
+   File file = new 
File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
 
-   /**
-* Returns the (designated) physical storage location of the BLOB with 
the given job ID and key.
-*
-* @param jobID
-*the ID of the job the BLOB belongs to
-* @param key
-*the key of the BLOB
-* @return the (designated) physical storage location of the BLOB with 
the given job ID and key
-*/
-   static File getStorageLocation(File storageDir, JobID jobID, String 
key) {
-   return new File(getJobDirectory(storageDir, jobID), 
BLOB_FILE_PREFIX + encodeKey(key));
+   mkdirTolerateExisting(file.getParentFile(), "cache");
--- End diff --

Why are we creating a `cache` directory here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125249786
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobCacheLibraryManager.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobCache;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Specialisation of {@link BlobLibraryCacheManager} that works with a 
{@link BlobCache}.
+ */
+public class BlobCacheLibraryManager extends BlobLibraryCacheManager {
--- End diff --

Shall we give it a more distinct name? It might be a little bit confusing 
if a sub class is a permutation of the super class's name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125275855
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
+
+   @Rule
+   public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Test cleanup for a job that finishes ordinarily.
+*/
+   @Test
+   public void testBlobServerCleanupFinishedJob() throws IOException {
+   testBlobServerCleanup(ExecutionState.FINISHED);
+   }
+
+   /**
+* Test cleanup for a job which is cancelled after submission.
+*/
+   @Test
+   public void testBlobServerCleanupCancelledJob() throws IOException {
+   testBlobServerCleanup(ExecutionState.CANCELED);
+   }
+
+   /**
+* Test cleanup for a job that fails (first a task fails, then the job 
recovers, then the whole
+* job fails due to a limited restart policy).
+*/
+   @Test
+   public void testBlobServerCleanupFailedJob() throws IOException {
+   testBlobServerCleanup(ExecutionState.FAILED);
+   }
+
+   private void testBlobServerCleanup(final ExecutionState finalState) 
throws IOException {
+   final int num_tasks = 2;
+   final File blobBaseDir = tmpFolder.newFolder();
+
+   new JavaTestKit(system) {{
+   new Within(duration("15 seconds")) {
+   

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-07-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r125250794
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobServerLibraryManager.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobServer;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Specialisation of {@link BlobLibraryCacheManager} that works with a 
{@link BlobServer}.
+ */
+public final class BlobServerLibraryManager extends 
BlobLibraryCacheManager {
--- End diff --

Can't we use the same BlobLibraryCacheManager implementation for the 
`BlobServerLibraryManager` and the `BlobCacheLibraryManager` which works on a 
`BlobService`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-06-30 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4238

[FLINK-7057][blob] move BLOB ref-counting from LibraryCacheManager to 
BlobCache

Currently, the `LibraryCacheManager` is doing some ref-counting for JAR 
files managed by it. Instead, we want the `BlobCache` to do that itself for 
**all** job-related BLOBs. Also, we do not want to operate on a per-BlobKey 
level but rather per job. Job-unrelated BLOBs should be cleaned manually as 
done for the Web-UI logs. A future API change will reflect the different use 
cases in a better way. For now, we need to also adapt the cleanup appropriately.

On the `BlobServer`, the JAR files should remain locally as well as in the 
HA store until the job enters a final state. Then they can be deleted.

With this intermediate state, job-unrelated BLOBs will remain in the file 
system until deleted manually. This is the same as the previous API use when 
working with a `BlobService` directly instead of going through the 
`LibraryCacheManager`. The aforementioned API extension will include TTL fields 
for those BLOBs in order to have a proper cleanup, too.

This PR is based upon #4237  in a series to implement FLINK-6916.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6916-7057

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4238.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4238


commit d54a316cfffd8243980df561fd4fcbd99934a40b
Author: Nico Kruber 
Date:   2016-12-20T15:49:57Z

[FLINK-6008][docs] minor improvements in the BlobService docs

commit b215515fa14d3f6af218e86b67bc2c27ae9d4f4f
Author: Nico Kruber 
Date:   2016-12-20T17:27:13Z

[FLINK-6008] refactor BlobCache#getURL() for cleaner code

commit bbcde52b3105fcf379c852b568f3893cc6052ce6
Author: Nico Kruber 
Date:   2016-12-21T15:23:29Z

[FLINK-6008] do not fail the BlobServer if delete fails

also extend the delete tests and remove one code duplication

commit dda1a12e40027724efb0e50005e5b57058a220f0
Author: Nico Kruber 
Date:   2017-01-06T17:42:58Z

[FLINK-6008][docs] update some config options to the new, non-deprecated 
ones

commit e12c2348b237207a50649d515a0fbbd19f92e6a0
Author: Nico Kruber 
Date:   2017-03-09T17:14:02Z

[FLINK-6008] use Preconditions.checkArgument in BlobClient

commit 24060e01332c6df9fd01f1dc5f321c3fda9301c1
Author: Nico Kruber 
Date:   2017-03-17T15:21:40Z

[FLINK-6008] fix concurrent job directory creation

also add according unit tests

commit 2e0d16ab8bf8a48a2d028602a3a7693fc4b76039
Author: Nico Kruber 
Date:   2017-06-14T16:01:47Z

[FLINK-6008] do not guard a delete() call with a check for existence

commit 7ba911d7ecb4861261dff8509996be0bd64d6d27
Author: Nico Kruber 
Date:   2017-04-18T14:37:37Z

[FLINK-6008] some comments about BlobLibraryCacheManager cleanup

commit d3f50d595f85356ae6ed0a85e1f8b8e8ac630bde
Author: Nico Kruber 
Date:   2017-04-19T13:39:03Z

[hotfix] minor typos

commit 79b6ce35a9e246b35415a388295f9ee2fc19a82e
Author: Nico Kruber 
Date:   2017-04-19T14:10:16Z

[FLINK-6008] further cleanup tests for BlobLibraryCacheManager

commit 23fb6ecd6c43c86d762503339c67953290236dca
Author: Nico Kruber 
Date:   2017-06-30T14:03:16Z

[FLINK-6008] address PR comments

commit 794764ceeed6b9bbbac08662f5754b218ff86c9c
Author: Nico Kruber 
Date:   2017-06-16T08:51:04Z

[FLINK-7052][blob] remove (unused) NAME_ADDRESSABLE mode

commit 774bafa85f242110a2ce7907c1150f8c62d73b3f
Author: Nico Kruber 
Date:   2017-06-21T15:05:57Z

[FLINK-7052][blob] remove further unused code due to the NAME_ADDRESSABLE 
removal

commit 4da3b3f6269e43bf1c66621099528824cad9373f
Author: Nico Kruber 
Date:   2017-06-22T15:31:17Z

[FLINK-7053][blob] remove code duplication in BlobClientSslTest

This lets BlobClientSslTest extend BlobClientTest as most of its 
implementation
came from there and was simply copied.

commit aa9cdc820f9ca1a38a19708bf45a2099e42eaf48
Author: Nico Kruber 
Date:   2017-06-23T09:40:34Z

[FLINK-7053][blob] verify some of the buffers returned by GET

commit c9b693a46053b55b3939ff471184796f12d36a72
Author: Nico Kruber 
Date:   2017-06-23T10:04:10Z

[FLINK-7053][blob] use TemporaryFolder for local BLOB dir in unit tests

This replaces the use of some temporary directory where it is not guaranteed
that it will