The directory referred to by `blob.storage.directory` is best described as
a local cache.  For recovery purposes the JARs are also stored in `
high-availability.storageDir`.    At least that's my reading of the code in
1.2.   Maybe there's some YARN specific behavior too, sorry if this
information is incomplete.

https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L106
https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java#L362
https://github.com/apache/flink/blob/release-1.2.1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java#L58
https://github.com/apache/flink/blob/release-1.2.1/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java#L57
https://github.com/apache/flink/blob/release-1.2.1/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java#L135


On Fri, Aug 4, 2017 at 11:56 AM, Shannon Carey <sca...@expedia.com> wrote:

> Stephan,
>
> Regarding your last reply to http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/blob-store-
> defaults-to-tmp-and-files-get-deleted-td11720.html
>
> You mention "Flink (via the user code class loader) actually holds a
> reference to the JAR files in "/tmp", so even if "/tmp" get wiped, the JAR
> file remains usable by the class loader". In my understanding, even if
> that's true, it doesn't work over a failure of the JobManager/TaskManager
> process, because the handle would be lost and the file would be gone.
>
> We're still running Flink 1.2.1, so maybe we're missing out on some of the
> improvements that have been made. However, we recently had a problem with a
> batch (DataSet) job not restarting successfully, apparently after a
> JobManager failure. This particular job runs in AWS EMR (on Yarn) which
> means that only one JobManager is run at a time, and when it fails it gets
> restarted.
>
> Here's what I can see from the logs. When the job restarts, it goes from
> CREATED -> RUNNING state, and then logs:
>
> 23:23:56,798 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        (flink-akka.actor.default-dispatcher-55): Job com.expedia…MyJob (
> c58185a78dd64cfc9f12374bd1f9a679) switched from state RUNNING to
> SUSPENDED.
> java.lang.Exception: JobManager is no longer the leader.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:319)
>
> I assume that's normal/expected, because the JobManager was restarted but
> some portion of the job state is still referring to the old one. Next,
> YarnJobManager logs: "Attempting to recover job
> c58185a78dd64cfc9f12374bd1f9a679." However, it subsequently fails:
>
> 2017-08-03 00:09:18,991 WARN  org.apache.flink.yarn.YarnJobManager
>                    (flink-akka.actor.default-dispatcher-96): Failed to
> recover job c58185a78dd64cfc9f12374bd1f9a679.
> java.lang.Exception: Failed to retrieve the submitted job graph from state
> handle.
> at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStor
> e.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:180)
> …
> Caused by: java.lang.RuntimeException: Unable to instantiate the hadoop
> input format
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.
> readObject(HadoopInputFormatBase.java:319)
> …
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:305)
> at org.apache.flink.runtime.state.RetrievableStreamStateHandle.
> retrieveState(RetrievableStreamStateHandle.java:58)
> at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStor
> e.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:178)
> ... 15 more
> Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro.
> AvroParquetInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.
> readObject(HadoopInputFormatBase.java:317)
> ... 69 more
>
> The missing class comes from our job, so it seems like the job jar isn't
> present on the classpath of the JobManager. When I look at the contents of
> our configured blob storage directory (we're not using /tmp), I see
> subfolders like:
>
> blobStore-7d40f1b9-7b06-400f-8c05-b5456adcd7f1
> blobStore-f2d7974c-7d86-4b11-a7fb-d1936a4593ed
>
> Only one of the two has a JAR in it, so it looks like there's a new
> directory created for each new JobManager. When I look in Zookeeper at
> nodes such as "/flink/main/jobgraphs/c58185a78dd64cfc9f12374bd1f9a679", I
> don't see those directories mentioned. I am wondering if someone can
> explain how Flink knows how to retrieve the job jar for job retry when the
> JobManager has failed? Are we running into a Flink bug here?
>
> Thanks for the info,
> Shannon
>
>

Reply via email to