Hi Maximilian,

Thanks for your help on this and merging the PR, I definitely understand
our setup is off the normal path (i.e. Using the parent first class
loader), and it’s definitely appreciated! :)

Not sure why you wasn’t able to reproduce the problem using that simple
job, maybe as you mentioned later - the behaviour stems from the way YARN
includes JARS, rather than using Flink directly.

We will take a look at tuning the YARN parameters to add further
mitigations for our issues, but we’re definitely looking forward to when
2.10 drops - hopefully EMR will support Flink 1.7 too!

Daniel











On 21/01/2019, 23:48, "Maximilian Michels" <[email protected]> wrote:

>Thanks again for providing the project to reproduce the error.
>Unfortunately, I
>could not reproduce the error with your instructions. I've tried killing
>TaskManagers with the restricted MetaSpace memory but I did not run out
>of
>MetaSpace memory in the remaining TaskManager. I did not use YARN.
>
>I saw your bug report in FLINK [1]. Some comments on that:
>
>First of all your initial issue is unrelated to Beam but caused by the
>Metaspace
>growing and Yarn killing your application [2]. The Metaspace grows
>because Flink
>does not limit it by default and when a job is restarted it needs to
>temporarily
>increase the MetaSpace size. Limiting it should force a GC instead. Read
>on to
>find out why GC might not work.
>
>You mentioned in the first comment of [1] that you copied the job jar
>into the
>lib folder. This is problematic with "parent-first" class loading because
>now
>libraries like Jackson in your job will be loaded from the parent
>classloader.
>This can then install the Flink Job classloader into the Jackson cache.
>The
>cache and Jackson will never be evicted as they are part of the parent
>classloader. Note, that this would not be a problem if class loading was
>"child-first" and Jackson is part of the user jar.
>
>However,
>
>1) The YARN "cluster" mode does seem to have a problem because it
>includes the
>jars into the Flink class loader. So really "child-first" vs
>"parent-first" does
>not matter because it will always fall back to the parent/Flink class
>loader.
>
>2) You might want to restart the entire YARN cluster on failures in the
>YARN
>"cluster" mode (see yarn.maximum-failed-containers), or use the "session"
>mode
>with "child-first".
>
>3) If the above is not feasible, you have to take care of cleaning the
>Jackson
>cache or any other static cache, like here:
>https://github.com/apache/beam/pull/7552
>
>I think the subtlety of this behavior warrants that we also merge the fix
>in
>Beam, as we have seen that clearing the cache gets rid of the problem.
>
>Cheers,
>Max
>
>[1] https://issues.apache.org/jira/browse/FLINK-11205
>[2] https://issues.apache.org/jira/browse/FLINK-10317
>
>On 21.01.19 03:46, Daniel Harper wrote:
>> "classloader.resolve-order” is set to ‘parent-first’
>>
>>
>> In the simple job that I wrote, the mvn dependency:tree yields the
>> following. It looks like beam-sdks-java-core includes jackson-databind,
>> and the flink runtime lib shades it
>>
>>
>> [INFO] Scanning for projects...
>> [WARNING]
>> [WARNING] Some problems were encountered while building the effective
>> model for example:streaming-job:jar:1.0-SNAPSHOT
>> [WARNING] 'build.plugins.plugin.version' for
>> org.apache.maven.plugins:maven-jar-plugin is missing. @ line 60, column
>>21
>> [WARNING]
>> [WARNING] It is highly recommended to fix these problems because they
>> threaten the stability of your build.
>> [WARNING]
>> [WARNING] For this reason, future Maven versions might no longer support
>> building such malformed projects.
>> [WARNING]
>> [INFO]
>> [INFO] -----------------------< example:streaming-job
>>> ------------------------
>> [INFO] Building streaming-job 1.0-SNAPSHOT
>> [INFO] --------------------------------[ jar
>> ]---------------------------------
>> [INFO]
>> [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @
>>streaming-job
>> ---
>> [INFO] example:streaming-job:jar:1.0-SNAPSHOT
>> [INFO] +- org.apache.beam:beam-sdks-java-core:jar:2.7.0:compile
>> [INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile
>> [INFO] |  +-
>> com.fasterxml.jackson.core:jackson-annotations:jar:2.9.5:compile
>> [INFO] |  +-
>>com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile
>> [INFO] |  +- org.slf4j:slf4j-api:jar:1.7.25:compile
>> [INFO] |  +- org.apache.avro:avro:jar:1.8.2:compile
>> [INFO] |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
>> [INFO] |  |  +-
>>org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
>> [INFO] |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
>> [INFO] |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
>> [INFO] |  +- joda-time:joda-time:jar:2.4:compile
>> [INFO] |  \- org.tukaani:xz:jar:1.8:compile
>> [INFO] \- org.apache.beam:beam-runners-flink_2.11:jar:2.7.0:compile
>> [INFO]    +- org.apache.beam:beam-runners-core-java:jar:2.7.0:compile
>> [INFO]    |  +- org.apache.beam:beam-model-pipeline:jar:2.7.0:compile
>> [INFO]    |  |  \-
>> com.google.errorprone:error_prone_annotations:jar:2.1.2:compile
>> [INFO]    |  \-
>>org.apache.beam:beam-model-fn-execution:jar:2.7.0:compile
>> [INFO]    +-
>> org.apache.beam:beam-runners-core-construction-java:jar:2.7.0:compile
>> [INFO]    |  \-
>>org.apache.beam:beam-model-job-management:jar:2.7.0:compile
>> [INFO]    +-
>> org.apache.beam:beam-runners-java-fn-execution:jar:2.7.0:compile
>> [INFO]    |  +-
>> org.apache.beam:beam-sdks-java-fn-execution:jar:2.7.0:compile
>> [INFO]    |  \-
>>
>>org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:jar:2.7.0:compi
>>le
>> [INFO]    +- org.slf4j:slf4j-simple:jar:1.7.25:compile
>> [INFO]    +- org.apache.commons:commons-compress:jar:1.16.1:compile
>> [INFO]    |  \- org.objenesis:objenesis:jar:2.6:compile
>> [INFO]    +- args4j:args4j:jar:2.33:compile
>> [INFO]    +- org.apache.flink:flink-clients_2.11:jar:1.5.2:compile
>> [INFO]    |  +- org.apache.flink:flink-optimizer_2.11:jar:1.5.2:compile
>> [INFO]    |  +- commons-cli:commons-cli:jar:1.3.1:compile
>> [INFO]    |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>> [INFO]    |  \- org.apache.flink:force-shading:jar:1.5.2:compile
>> [INFO]    +- org.apache.flink:flink-core:jar:1.5.2:compile
>> [INFO]    |  +- org.apache.flink:flink-annotations:jar:1.5.2:compile
>> [INFO]    |  +- org.apache.flink:flink-shaded-asm:jar:5.0.4-2.0:compile
>> [INFO]    |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
>> [INFO]    |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
>> [INFO]    |  |  \- com.esotericsoftware.minlog:minlog:jar:1.2:compile
>> [INFO]    |  \-
>>commons-collections:commons-collections:jar:3.2.2:compile
>> [INFO]    +- org.apache.flink:flink-metrics-core:jar:1.5.2:compile
>> [INFO]    +- org.apache.flink:flink-java:jar:1.5.2:compile
>> [INFO]    |  \- org.apache.commons:commons-math3:jar:3.5:compile
>> [INFO]    +- org.apache.flink:flink-runtime_2.11:jar:1.5.2:compile
>> [INFO]    |  +-
>>
>>org.apache.flink:flink-queryable-state-client-java_2.11:jar:1.5.2:compile
>> [INFO]    |  +- org.apache.flink:flink-hadoop-fs:jar:1.5.2:compile
>> [INFO]    |  +- commons-io:commons-io:jar:2.4:compile
>> [INFO]    |  +-
>> org.apache.flink:flink-shaded-netty:jar:4.0.27.Final-2.0:compile
>> [INFO]    |  +- org.apache.flink:flink-shaded-guava:jar:18.0-2.0:compile
>> [INFO]    |  +-
>>org.apache.flink:flink-shaded-jackson:jar:2.7.9-3.0:compile
>> [INFO]    |  +- org.javassist:javassist:jar:3.18.2-GA:compile
>> [INFO]    |  +- org.scala-lang:scala-library:jar:2.11.12:compile
>> [INFO]    |  +- com.typesafe.akka:akka-actor_2.11:jar:2.4.20:compile
>> [INFO]    |  |  +- com.typesafe:config:jar:1.3.0:compile
>> [INFO]    |  |  \-
>> org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
>> [INFO]    |  +- com.typesafe.akka:akka-stream_2.11:jar:2.4.20:compile
>> [INFO]    |  |  +-
>>org.reactivestreams:reactive-streams:jar:1.0.0:compile
>> [INFO]    |  |  \- com.typesafe:ssl-config-core_2.11:jar:0.2.1:compile
>> [INFO]    |  |     \-
>> org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:compile
>> [INFO]    |  +- com.typesafe.akka:akka-protobuf_2.11:jar:2.4.20:compile
>> [INFO]    |  +- com.typesafe.akka:akka-slf4j_2.11:jar:2.4.20:compile
>> [INFO]    |  +- org.clapper:grizzled-slf4j_2.11:jar:1.0.2:compile
>> [INFO]    |  +- com.github.scopt:scopt_2.11:jar:3.5.0:compile
>> [INFO]    |  \- com.twitter:chill_2.11:jar:0.7.4:compile
>> [INFO]    |     \- com.twitter:chill-java:jar:0.7.4:compile
>> [INFO]    \-
>>org.apache.flink:flink-streaming-java_2.11:jar:1.5.2:compile
>> [INFO]
>> ------------------------------------------------------------------------
>> [INFO] BUILD SUCCESS
>> [INFO]
>> ------------------------------------------------------------------------
>> [INFO] Total time:  0.928 s
>> [INFO] Finished at: 2019-01-21T08:45:08Z
>> [INFO]
>> ------------------------------------------------------------------------
>>
>>
>>
>>
>>
>> On 18/01/2019, 17:01, "Maximilian Michels" <[email protected]> wrote:
>>
>>> Hi Daniel,
>>>
>>> I did some more debugging. I think the fix we proposed only cures the
>>> symptoms.
>>> The cause is that your job uses Jackson which is also a dependency of
>>> Flink.
>>>
>>> So your job ends up using Flink's version of Jackson which then
>>>installs
>>> classes
>>>from your job in the Jackson cache. Now, this wouldn't be a problem, if
>>> you
>>> shaded your version of Jackson, i.e. renamed the Jackson package.
>>>
>>> But even without shading, the default behavior of Flink is to load user
>>> classes
>>> first. Could you please check:
>>>
>>> 1) Is "classloader.resolve-order" set to "child-first"?
>>> 2) Do you include the Jackson library in your user jar?
>>>
>>> Thanks,
>>> Max
>>>
>>> On 18.01.19 03:40, Daniel Harper wrote:
>>>> Thanks for raising this and the PR!
>>>>
>>>> In our production streaming job we’re using Kinesis, so good shout on
>>>> the
>>>> UnboundedSupportWrapper.
>>>>
>>>> On 17/01/2019, 21:08, "Maximilian Michels" <[email protected]> wrote:
>>>>
>>>>> I'm glad that solved your GC problem. I think dipose() is a good
>>>>>place,
>>>>> it is
>>>>> meant for cleanup.
>>>>>
>>>>> In your case the DoFn is a NOOP, so the PipelineOptions are probably
>>>>> loaded
>>>>> through your UnboundedSource. If both happen to be scheduled in the
>>>>> same
>>>>> TaskManager that is fine. However, just for precaution we should also
>>>>> include
>>>>> the cache invalidation in UnboundedSourceWrapper.
>>>>>
>>>>> This way we should be good for the streaming execution. Will try to
>>>>>get
>>>>> this
>>>>> into 2.10.0.
>>>>>
>>>>> Thanks,
>>>>> Max
>>>>>
>>>>> Issue: https://jira.apache.org/jira/browse/BEAM-6460
>>>>>
>>>>> On 17.01.19 12:50, Daniel Harper wrote:
>>>>>> Max, Juan,
>>>>>>
>>>>>> Just tried patching this class
>>>>>>
>>>>>>
>>>>>>
>>>>>>https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/jav
>>>>>>a/
>>>>>> or
>>>>>> g/
>>>>>>
>>>>>>
>>>>>>
>>>>>>apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator
>>>>>>.j
>>>>>> av
>>>>>> a#
>>>>>> L389 and putting the clearCache call in the finally block.
>>>>>>
>>>>>> Redoing the test causes the GC to kick in (see screenshot)
>>>>>>
>>>>>> I¹m not sure if this is the best place to put this clean up code
>>>>>> though,
>>>>>> is this the final place where all BEAM related stuff get terminated?
>>>>>>
>>>>>> Daniel.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 17/01/2019, 16:18, "Maximilian Michels" <[email protected]> wrote:
>>>>>>
>>>>>>> Hi Daniel, hi Juan,
>>>>>>>
>>>>>>> @Daniel Thanks a lot for investigating and reporting the issue.
>>>>>>>
>>>>>>> Your analysis looks convincing, it may be that Jackson is holding
>>>>>>>on
>>>>>>> to
>>>>>>> the
>>>>>>> Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.
>>>>>>>
>>>>>>> Have you already tried to call
>>>>>>> TypeFactory.defaultInstance().clearCache()
>>>>>>> in a
>>>>>>> catch-all block within your synthetic Beam job, before actually
>>>>>>> failing?
>>>>>>> That
>>>>>>> way we could see if the classloader is garbage-collected after a
>>>>>>> restart.
>>>>>>>
>>>>>>> Let me also investigate in the meantime. We are in the progress of
>>>>>>> getting the
>>>>>>> 2.10.0 release ready with a few pending issues. So it would be a
>>>>>>>good
>>>>>>> time to
>>>>>>> fix this issue.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Max
>>>>>>>
>>>>>>> On 17.01.19 09:50, Juan Carlos Garcia wrote:
>>>>>>>> Nice finding, we are also experiencing the same (Flink 1.5.4)
>>>>>>>>where
>>>>>>>> few jobs
>>>>>>>> are dying of OOM for the metaspace as well after multiple restart,
>>>>>>>> in
>>>>>>>> our case
>>>>>>>> we have
>>>>>>>> a HA flink cluster and not using YARN for orchestration.
>>>>>>>>
>>>>>>>> Good job with the diagnosing .
>>>>>>>>
>>>>>>>> JC
>>>>>>>>
>>>>>>>> On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper
>>>>>>>> <[email protected]
>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>
>>>>>>>>        Environment:
>>>>>>>>
>>>>>>>>        BEAM 2.7.0
>>>>>>>>        Flink 1.5.2
>>>>>>>>        AWS EMR 5.17.0
>>>>>>>>        Hadoop YARN for orchestration
>>>>>>>>
>>>>>>>>
>>>>>>>>        We¹ve noticed the metaspace usage increasing when our Flink
>>>>>>>> job
>>>>>>>> restarts,
>>>>>>>>        which in turn sometimes causes YARN to kill the container
>>>>>>>>for
>>>>>>>> going
>>>>>>>> beyond
>>>>>>>>        its physical memory limits. After setting the
>>>>>>>>MaxMetaspaceSize
>>>>>>>> setting and
>>>>>>>>        making the JVM dump its heap on OOM, we noticed quite a few
>>>>>>>> instances of the
>>>>>>>>        FlinkUserClassLoader class hanging around, which
>>>>>>>>corresponded
>>>>>>>> with
>>>>>>>> the
>>>>>>>>        number of restarts that happened.
>>>>>>>>
>>>>>>>>        Originally I posted this issue on the FLINK mailing list
>>>>>>>>here
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com
>>>>>>>>/U
>>>>>>>> se
>>>>>>>> r-
>>>>>>>> ClassLoader-leak-on-job-restart-td25547.html
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        After investigation I think this is related to something in
>>>>>>>> the
>>>>>>>> BEAM code,
>>>>>>>>        or the way BEAM interacts with the Flink class loading
>>>>>>>> mechanism,
>>>>>>>> because I
>>>>>>>>        can see the following when selecting one of the Œold¹
>>>>>>>> classloaders
>>>>>>>> -> Path
>>>>>>>>        to GC Roots using Eclipse MAT in one of the heap dumps
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        This looks to me like this issue
>>>>>>>>        https://github.com/FasterXML/jackson-databind/issues/1363
>>>>>>>>
>>>>>>>>
>>>>>>>>        It sounds like to resolve it, user code should call
>>>>>>>>        TypeFactory.defaultInstance().clearCache()when threads are
>>>>>>>> shutdown. I¹m not
>>>>>>>>        sure where in the FlinkRunner codebase this should be
>>>>>>>>though
>>>>>>>>
>>>>>>>>
>>>>>>>>        To try and narrow it down as much as possible/reduce the
>>>>>>>> number
>>>>>>>> of
>>>>>>>>        dependencies I¹ve managed to reproduce this with a really
>>>>>>>> really
>>>>>>>> simple job
>>>>>>>>        that just reads from a synthetic unbounded source
>>>>>>>>(back-ported
>>>>>>>> from
>>>>>>>> the
>>>>>>>>        master branch) and does nothing
>>>>>>>> https://github.com/djhworld/streaming-job,
>>>>>>>>        this will run on a Flink environment.
>>>>>>>>
>>>>>>>>        To reproduce the OOM I just ran the job with
>>>>>>>> MaxMetaspaceSize=125M,
>>>>>>>> and then
>>>>>>>>        killed a random task manager every 60 seconds, which
>>>>>>>>yielded
>>>>>>>> the
>>>>>>>> following
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        As you can see the number of classes increases on each
>>>>>>>> restart,
>>>>>>>> which causes
>>>>>>>>        the metaspace to increase and eventually cause an OOM.
>>>>>>>>
>>>>>>>>        Is there anything we could do to fix this? I¹ve not tested
>>>>>>>> this
>>>>>>>> on
>>>>>>>>> 2.7.0
>>>>>>>>        because we are waiting for 2.10 to drop so we can run Flink
>>>>>>>> 1.6/1.7
>>>>>>>> on EMR
>>>>>>>>
>>>>>>>>        With thanks,
>>>>>>>>
>>>>>>>>        Daniel
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        ----------------------------
>>>>>>>>
>>>>>>>>        http://www.bbc.co.uk <http://www.bbc.co.uk>
>>>>>>>>        This e-mail (and any attachments) is confidential and may
>>>>>>>> contain
>>>>>>>> personal
>>>>>>>>        views which are not the views of the BBC unless
>>>>>>>>specifically
>>>>>>>> stated.
>>>>>>>>        If you have received it in error, please delete it from
>>>>>>>>your
>>>>>>>> system.
>>>>>>>>        Do not use, copy or disclose the information in any way nor
>>>>>>>> act
>>>>>>>> in
>>>>>>>> reliance
>>>>>>>>        on it and notify the sender immediately.
>>>>>>>>        Please note that the BBC monitors e-mails sent or received.
>>>>>>>>        Further communication will signify your consent to this.
>>>>>>>>
>>>>>>>>        ---------------------
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> JC
>>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> -----------------------------
>>>>>> http://www.bbc.co.uk
>>>>>> This e-mail (and any attachments) is confidential and
>>>>>> may contain personal views which are not the views of the BBC unless
>>>>>> specifically stated.
>>>>>> If you have received it in
>>>>>> error, please delete it from your system.
>>>>>> Do not use, copy or disclose the
>>>>>> information in any way nor act in reliance on it and notify the
>>>>>>sender
>>>>>> immediately.
>>>>>> Please note that the BBC monitors e-mails
>>>>>> sent or received.
>>>>>> Further communication will signify your consent to
>>>>>> this.
>>>>>> -----------------------------
>>>>>>
>>>>
>>>>
>>>>
>>>> -----------------------------
>>>> http://www.bbc.co.uk
>>>> This e-mail (and any attachments) is confidential and
>>>> may contain personal views which are not the views of the BBC unless
>>>> specifically stated.
>>>> If you have received it in
>>>> error, please delete it from your system.
>>>> Do not use, copy or disclose the
>>>> information in any way nor act in reliance on it and notify the sender
>>>> immediately.
>>>> Please note that the BBC monitors e-mails
>>>> sent or received.
>>>> Further communication will signify your consent to
>>>> this.
>>>> -----------------------------
>>>>
>>
>>
>>
>> -----------------------------
>> http://www.bbc.co.uk
>> This e-mail (and any attachments) is confidential and
>> may contain personal views which are not the views of the BBC unless
>>specifically stated.
>> If you have received it in
>> error, please delete it from your system.
>> Do not use, copy or disclose the
>> information in any way nor act in reliance on it and notify the sender
>> immediately.
>> Please note that the BBC monitors e-mails
>> sent or received.
>> Further communication will signify your consent to
>> this.
>> -----------------------------
>>



-----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and
may contain personal views which are not the views of the BBC unless 
specifically stated.
If you have received it in
error, please delete it from your system.
Do not use, copy or disclose the
information in any way nor act in reliance on it and notify the sender
immediately.
Please note that the BBC monitors e-mails
sent or received.
Further communication will signify your consent to
this.
-----------------------------

Reply via email to