Hi Ufuk,

I'll answer your question, but first I'll give you an update on how we
resolved the issue:

* adding `org.apache.hadoop.io.compress.SnappyCodec` to
`classloader.parent-first-patterns.additional` in `flink-conf.yaml`
(though, putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
* putting a jar with `hadoop-common` + it's transitive dependencies, then
using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and
its transitive dependencies). So we end up with jar that has `SnappyCodec`
and whatever it needs to call transitively. We put this jar on the task
manager classpath.

I believe `SnappyCodec` was being called via our code. This worked the
first time but deploying a second time caused `libhadoop.so` to be loaded
in a second class loader. By putting a jar with `SnappyCodec` and it's
transitive dependencies on the task manager classpath and specifying that
`SnappyCodec` needs to be loaded from the parent classloader, we ensure
that only one classloader loads `libhadoop.so`. I don't think this is the
best way to achieve what we want, but it works for now.

Next steps: if no one is on it, I can take a stab at updating the
documentation to clarify how to debug and resolve Native library loading.
This was a nice learning experience and I think it'll be helpful to have
this in the docs for those who aren't well-versed in how classloading on
the JVM works!

To answer your questions:

1. We install hadoop on our machines and tell flink task managers to access
it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in
`flink-conf.yaml`
2. We put flink's shaded hadoop-fs-s3 on both the task manager and job
manager classpath (I believe this is only used by the Job Managers when
they interact with S3 for checkpoints etc. I don't believe any user code is
using this).
3. Our flink applications consist of a "fat jar" that has some
`org.apache.hadoop` dependencies bundled with it. I believe this is the
source of why we're loading `SnappyCodec` twice and triggering this issue.
4. For example code: we have a small wrapper around
`org.apache.flink.api.common.io.FileInputFormat` which does the work with
sequence files. It looks like (after removing some stuff to make it more
clear):

```
abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <:
Writable](
    typeInformation: TypeInformation[T]
) extends FileInputFormat[T]
    with ResultTypeQueryable[T] {
  @transient private var bufferedNextRecord: T = _
  @transient private var hadoopStream: HadoopFSDataInputStream = _
  @transient private var sequenceFileReader: SequenceFile.Reader = _

  unsplittable = true
  enumerateNestedFiles = true

  // *****************************************
  // This is where we'd see exceptions.
  // *****************************************
  override def open(fileSplit: FileInputSplit): Unit = {
    super.open(fileSplit)
    val config = new Configuration()
    hadoopStream = WrappedHadoopInputStream.wrap(stream)
    sequenceFileReader = new SequenceFile.Reader(config,
SequenceFile.Reader.stream(hadoopStream))
    bufferNextRecord()
  }
...
}

// AND

class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
    extends InputStream
    with Seekable
    with PositionedReadable {

  def read(): Int = underlying.read()
  def seek(pos: Long): Unit = underlying.seek(pos)
  def getPos: Long = underlying.getPos
}
...
```

Thanks for all your help, I appreciate it! I wouldn't have been able to
debug and resolve this if it wasn't for you filing the ticket. Thank you so
much!

[0] https://github.com/pantsbuild/jarjar

Aaron Levin

On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi <u...@apache.org> wrote:

> Hey Aaron,
>
> sorry for the late reply (again).
>
> (1) I think that your final result is in line with what I have
> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.
>
> (2) I think renaming the file would not help as it will still be
> loaded multiple times when the jobs restarts (as it happens in
> FLINK-11402).
>
> (3) I'll try to check whether Flink's shading of Hadoop is related to
> this. I don't think so though. @Chesnay (cc'd): What do you think?
>
> (4) @Aaron: Can you tell me which Hadoop libraries you use and share
> some code so I can try to reproduce this exactly on my side? Judging
> from the earlier stack traces you have shared, I'm assuming you are
> trying to read Snappy-compressed sequence files.
>
> – Ufuk
>
> On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin <aaronle...@stripe.com> wrote:
> >
> > I don't control the code calling `System.loadLibrary("hadoop")` so
> that's not an option for me, unfortunately.
> >
> > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma <guowei....@gmail.com> wrote:
> >>
> >> This may be caused by a  jvm process can only load a so once.So a triky
> way is to rename it。
> >>
> >> 发自我的 iPhone
> >>
> >> 在 2019年1月25日,上午7:12,Aaron Levin <aaronle...@stripe.com> 写道:
> >>
> >> Hi Ufuk,
> >>
> >> Update: I've pinned down the issue. It's multiple classloaders loading
> `libhadoop.so`:
> >>
> >> ```
> >> failed to load native hadoop with error:
> java.lang.UnsatisfiedLinkError: Native Library /usr/lib/libhadoop.so
> already loaded in another classloader
> >> ```
> >>
> >> I'm not quite sure what the solution is. Ideally flink would destroy a
> classloader when a job is canceled, but perhaps there's a jvm limitation
> there? Putting the libraries into `/usr/lib` or `/lib` does not work (as
> suggested by Chesnay in the ticket) as I get the same error. I might see if
> I can put a jar with `org.apache.hadoop.common.io.compress` in
> `/flink/install/lib` and then remove it from my jar. It's not an ideal
> solution but I can't think of anything else.
> >>
> >> Best,
> >>
> >> Aaron Levin
> >>
> >> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <aaronle...@stripe.com>
> wrote:
> >>>
> >>> Hi Ufuk,
> >>>
> >>> I'm starting to believe the bug is much deeper than the originally
> reported error because putting the libraries in `/usr/lib` or `/lib` does
> not work. This morning I dug into why putting `libhadoop.so` into
> `/usr/lib` didn't work, despite that being in the `java.library.path` at
> the call site of the error. I wrote a small program to test the loading of
> native libraries, and it was able to successfully load `libhadoop.so`. I'm
> very perplexed. Could this be related to the way flink shades hadoop stuff?
> >>>
> >>> Here is my program and its output:
> >>>
> >>> ```
> >>> $ cat LibTest.scala
> >>> package com.redacted.flink
> >>>
> >>> object LibTest {
> >>>   def main(args: Array[String]): Unit = {
> >>>     val library = args(0)
> >>>
>  
> System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
> >>>     System.out.println(s"Attempting to load $library")
> >>>     System.out.flush()
> >>>     System.loadLibrary(library)
> >>>     System.out.println(s"Successfully loaded ")
> >>>     System.out.flush()
> >>> }
> >>> ```
> >>>
> >>> I then tried running that on one of the task managers with `hadoop` as
> an argument:
> >>>
> >>> ```
> >>> $ java -jar lib_test_deploy.jar hadoop
> >>>
> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
> >>> Attempting to load hadoop
> >>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop
> in java.library.path
> >>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
> >>> at java.lang.Runtime.loadLibrary0(Runtime.java:870)
> >>> at java.lang.System.loadLibrary(System.java:1122)
> >>> at com.stripe.flink.LibTest$.main(LibTest.scala:11)
> >>> at com.stripe.flink.LibTest.main(LibTest.scala)
> >>> ```
> >>>
> >>> I then copied the native libraries into `/usr/lib/` and ran it again:
> >>>
> >>> ```
> >>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
> >>> $ java -jar lib_test_deploy.jar hadoop
> >>>
> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
> >>> Attempting to load hadoop
> >>> Successfully loaded
> >>> ```
> >>>
> >>> Any ideas?
> >>>
> >>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <aaronle...@stripe.com>
> wrote:
> >>>>
> >>>> Hi Ufuk,
> >>>>
> >>>> One more update: I tried copying all the hadoop native `.so` files
> (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I
> reported. I also tried naively adding the `.so` files to the jar with the
> flink application and am still experiencing the issue I reported (however,
> I'm going to investigate this further as I might not have done it
> correctly).
> >>>>
> >>>> Best,
> >>>>
> >>>> Aaron Levin
> >>>>
> >>>> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <aaronle...@stripe.com>
> wrote:
> >>>>>
> >>>>> Hi Ufuk,
> >>>>>
> >>>>> Two updates:
> >>>>>
> >>>>> 1. As suggested in the ticket, I naively copied the every `.so` in
> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
> knowledge of how shared libs get picked up is hazy, so I'm not sure if
> blindly copying them like that should work. I did check what
> `System.getProperty("java.library.path")` returns at the call-site and
> it's:
> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
> >>>>> 2. The exception I see comes from
> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
> This uses `System.loadLibrary("hadoop")`.
> >>>>>
> >>>>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
> >>>>> [2019-01-23 19:52:33.081376]  at
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
> >>>>> [2019-01-23 19:52:33.081406]  at
> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
> >>>>> [2019-01-23 19:52:33.081429]  at
> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
> >>>>> [2019-01-23 19:52:33.081457]  at
> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
> >>>>> [2019-01-23 19:52:33.081494]  at org.apache.hadoop.io
> .SequenceFile$Reader.init(SequenceFile.java:2037)
> >>>>> [2019-01-23 19:52:33.081517]  at org.apache.hadoop.io
> .SequenceFile$Reader.initialize(SequenceFile.java:1923)
> >>>>> [2019-01-23 19:52:33.081549]  at org.apache.hadoop.io
> .SequenceFile$Reader.<init>(SequenceFile.java:1872)
> >>>>> ... (redacted) ...
> >>>>> [2019-01-23 19:52:33.081728]  at
> scala.collection.immutable.List.foreach(List.scala:392)
> >>>>> ... (redacted) ...
> >>>>> [2019-01-23 19:52:33.081832]  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> >>>>> [2019-01-23 19:52:33.081854]  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> >>>>> [2019-01-23 19:52:33.081882]  at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> >>>>> [2019-01-23 19:52:33.081904]  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> >>>>> [2019-01-23 19:52:33.081946]  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> >>>>> [2019-01-23 19:52:33.081967]  at
> java.lang.Thread.run(Thread.java:748)
> >>>>>
> >>>>> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <aaronle...@stripe.com>
> wrote:
> >>>>>>
> >>>>>> Hey Ufuk,
> >>>>>>
> >>>>>> So, I looked into this a little bit:
> >>>>>>
> >>>>>> 1. clarification: my issues are with the hadoop-related snappy
> libraries and not libsnappy itself (this is my bad for not being clearer,
> sorry!). I already have `libsnappy` on my classpath, but I am looking into
> including the hadoop snappy libraries.
> >>>>>> 2. exception: I don't see the class loading error. I'm going to try
> to put some more instrumentation and see if I can get a clearer stacktrace
> (right now I get an NPE on closing a sequence file in a finalizer - when I
> last logged the exception it was something deep in hadoop's snappy libs -
> I'll get clarification soon).
> >>>>>> 3. I'm looking into including hadoop's snappy libs in my jar and
> we'll see if that resolves the problem.
> >>>>>>
> >>>>>> Thanks again for your help!
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Aaron Levin
> >>>>>>
> >>>>>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <aaronle...@stripe.com>
> wrote:
> >>>>>>>
> >>>>>>> Hey,
> >>>>>>>
> >>>>>>> Thanks so much for the help! This is awesome. I'll start looking
> into all of this right away and report back.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>>
> >>>>>>> Aaron Levin
> >>>>>>>
> >>>>>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <u...@apache.org>
> wrote:
> >>>>>>>>
> >>>>>>>> Hey Aaron,
> >>>>>>>>
> >>>>>>>> sorry for the late reply.
> >>>>>>>>
> >>>>>>>> (1) I think I was able to reproduce this issue using snappy-java.
> I've
> >>>>>>>> filed a ticket here:
> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check
> the
> >>>>>>>> ticket description whether it's in line with what you are
> >>>>>>>> experiencing? Most importantly, do you see the same Exception
> being
> >>>>>>>> reported after cancelling and re-starting the job?
> >>>>>>>>
> >>>>>>>> (2) I don't think it's caused by the environment options not being
> >>>>>>>> picked up. You can check the head of the log files of the
> JobManager
> >>>>>>>> or TaskManager to verify that your provided option is picked up as
> >>>>>>>> expected. You should see something similar to this:
> >>>>>>>>
> >>>>>>>> 2019-01-21 22:53:49,863 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>>
> --------------------------------------------------------------------------------
> >>>>>>>> 2019-01-21 22:53:49,864 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
> >>>>>>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
> >>>>>>>> ...
> >>>>>>>> 2019-01-21 22:53:49,865 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> JVM
> >>>>>>>> Options:
> >>>>>>>> 2019-01-21 22:53:49,865 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> -Xms1024m
> >>>>>>>> 2019-01-21 22:53:49,865 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> -Xmx1024m
> >>>>>>>> You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/
> <----
> >>>>>>>> 2019-01-21 22:53:49,865 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>>
> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
> >>>>>>>> ...
> >>>>>>>> 2019-01-21 22:53:49,866 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> Program Arguments:
> >>>>>>>> 2019-01-21 22:53:49,866 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> --configDir
> >>>>>>>> 2019-01-21 22:53:49,866 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> /.../flink-1.7.0/conf
> >>>>>>>> 2019-01-21 22:53:49,866 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> --executionMode
> >>>>>>>> 2019-01-21 22:53:49,866 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>> cluster
> >>>>>>>> ...
> >>>>>>>> 2019-01-21 22:53:49,866 INFO
> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
> >>>>>>>>
> --------------------------------------------------------------------------------
> >>>>>>>>
> >>>>>>>> Can you verify that you see the log messages as expected?
> >>>>>>>>
> >>>>>>>> (3) As noted FLINK-11402, is it possible to package the snappy
> library
> >>>>>>>> as part of your user code instead of loading the library via
> >>>>>>>> java.library.path? In my example, that seems to work fine.
> >>>>>>>>
> >>>>>>>> – Ufuk
> >>>>>>>>
> >>>>>>>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <
> aaronle...@stripe.com> wrote:
> >>>>>>>> >
> >>>>>>>> > Hello!
> >>>>>>>> >
> >>>>>>>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact
> when a job is canceled or fails and then is restarted (with or without
> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts`
> seem to start having impact again and our job will run without failure.
> More below.
> >>>>>>>> >
> >>>>>>>> > We use consume Snappy-compressed sequence files in our flink
> job. This requires access to the hadoop native libraries. In our
> `flink-conf.yaml` for both the task manager and the job manager, we put:
> >>>>>>>> >
> >>>>>>>> > ```
> >>>>>>>> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> >>>>>>>> > ```
> >>>>>>>> >
> >>>>>>>> > If I launch our job on freshly-restarted task managers, the job
> operates fine. If at some point I cancel the job or if the job restarts for
> some other reason, the job will begin to crashloop because it tries to open
> a Snappy-compressed file but doesn't have access to the codec from the
> native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then
> restart the task manager while the job is crashlooping, the job is start
> running without any codec failures.
> >>>>>>>> >
> >>>>>>>> > The only reason I can conjure that would cause the Snappy
> compression to fail is if the `env.java.opts` were not being passed through
> to the job on restart for some reason.
> >>>>>>>> >
> >>>>>>>> > Does anyone know what's going on? Am I missing some additional
> configuration? I really appreciate any help!
> >>>>>>>> >
> >>>>>>>> > About our setup:
> >>>>>>>> >
> >>>>>>>> > - Flink Version: 1.7.0
> >>>>>>>> > - Deployment: Standalone in HA
> >>>>>>>> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use
> Flink’s shaded jars to access our files in S3. We do not use the
> `bundled-with-hadoop` distribution of Flink.
> >>>>>>>> >
> >>>>>>>> > Best,
> >>>>>>>> >
> >>>>>>>> > Aaron Levin
>

Reply via email to