The directory referred to by `blob.storage.directory` is best described as a local cache. For recovery purposes the JARs are also stored in ` high-availability.storageDir`. At least that's my reading of the code in 1.2. Maybe there's some YARN specific behavior too, sorry if this information is incomplete.
https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L106 https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L362 https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java#L58 https://github.com/apache/flink/blob/release-1.2.1/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java#L57 https://github.com/apache/flink/blob/release-1.2.1/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java#L135 On Fri, Aug 4, 2017 at 11:56 AM, Shannon Carey <sca...@expedia.com> wrote: > Stephan, > > Regarding your last reply to http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/blob-store- > defaults-to-tmp-and-files-get-deleted-td11720.html > > You mention "Flink (via the user code class loader) actually holds a > reference to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR > file remains usable by the class loader". In my understanding, even if > that's true, it doesn't work over a failure of the JobManager/TaskManager > process, because the handle would be lost and the file would be gone. > > We're still running Flink 1.2.1, so maybe we're missing out on some of the > improvements that have been made. However, we recently had a problem with a > batch (DataSet) job not restarting successfully, apparently after a > JobManager failure. This particular job runs in AWS EMR (on Yarn) which > means that only one JobManager is run at a time, and when it fails it gets > restarted. > > Here's what I can see from the logs. When the job restarts, it goes from > CREATED -> RUNNING state, and then logs: > > 23:23:56,798 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > (flink-akka.actor.default-dispatcher-55): Job com.expedia…MyJob ( > c58185a78dd64cfc9f12374bd1f9a679) switched from state RUNNING to > SUSPENDED. > java.lang.Exception: JobManager is no longer the leader. > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1.applyOrElse(JobManager.scala:319) > > I assume that's normal/expected, because the JobManager was restarted but > some portion of the job state is still referring to the old one. Next, > YarnJobManager logs: "Attempting to recover job > c58185a78dd64cfc9f12374bd1f9a679." However, it subsequently fails: > > 2017-08-03 00:09:18,991 WARN org.apache.flink.yarn.YarnJobManager > (flink-akka.actor.default-dispatcher-96): Failed to > recover job c58185a78dd64cfc9f12374bd1f9a679. > java.lang.Exception: Failed to retrieve the submitted job graph from state > handle. > at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStor > e.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:180) > … > Caused by: java.lang.RuntimeException: Unable to instantiate the hadoop > input format > at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase. > readObject(HadoopInputFormatBase.java:319) > … > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:305) > at org.apache.flink.runtime.state.RetrievableStreamStateHandle. > retrieveState(RetrievableStreamStateHandle.java:58) > at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStor > e.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:178) > ... 15 more > Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro. > AvroParquetInputFormat > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase. > readObject(HadoopInputFormatBase.java:317) > ... 69 more > > The missing class comes from our job, so it seems like the job jar isn't > present on the classpath of the JobManager. When I look at the contents of > our configured blob storage directory (we're not using /tmp), I see > subfolders like: > > blobStore-7d40f1b9-7b06-400f-8c05-b5456adcd7f1 > blobStore-f2d7974c-7d86-4b11-a7fb-d1936a4593ed > > Only one of the two has a JAR in it, so it looks like there's a new > directory created for each new JobManager. When I look in Zookeeper at > nodes such as "/flink/main/jobgraphs/c58185a78dd64cfc9f12374bd1f9a679", I > don't see those directories mentioned. I am wondering if someone can > explain how Flink knows how to retrieve the job jar for job retry when the > JobManager has failed? Are we running into a Flink bug here? > > Thanks for the info, > Shannon > >