Hi Ufuk, I'll answer your question, but first I'll give you an update on how we resolved the issue:
* adding `org.apache.hadoop.io.compress.SnappyCodec` to `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, putting `org.apache.hadoop.util.NativeCodeLoader` also worked) * putting a jar with `hadoop-common` + it's transitive dependencies, then using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its transitive dependencies). So we end up with jar that has `SnappyCodec` and whatever it needs to call transitively. We put this jar on the task manager classpath. I believe `SnappyCodec` was being called via our code. This worked the first time but deploying a second time caused `libhadoop.so` to be loaded in a second class loader. By putting a jar with `SnappyCodec` and it's transitive dependencies on the task manager classpath and specifying that `SnappyCodec` needs to be loaded from the parent classloader, we ensure that only one classloader loads `libhadoop.so`. I don't think this is the best way to achieve what we want, but it works for now. Next steps: if no one is on it, I can take a stab at updating the documentation to clarify how to debug and resolve Native library loading. This was a nice learning experience and I think it'll be helpful to have this in the docs for those who aren't well-versed in how classloading on the JVM works! To answer your questions: 1. We install hadoop on our machines and tell flink task managers to access it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in `flink-conf.yaml` 2. We put flink's shaded hadoop-fs-s3 on both the task manager and job manager classpath (I believe this is only used by the Job Managers when they interact with S3 for checkpoints etc. I don't believe any user code is using this). 3. Our flink applications consist of a "fat jar" that has some `org.apache.hadoop` dependencies bundled with it. I believe this is the source of why we're loading `SnappyCodec` twice and triggering this issue. 4. For example code: we have a small wrapper around `org.apache.flink.api.common.io.FileInputFormat` which does the work with sequence files. It looks like (after removing some stuff to make it more clear): ``` abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable]( typeInformation: TypeInformation[T] ) extends FileInputFormat[T] with ResultTypeQueryable[T] { @transient private var bufferedNextRecord: T = _ @transient private var hadoopStream: HadoopFSDataInputStream = _ @transient private var sequenceFileReader: SequenceFile.Reader = _ unsplittable = true enumerateNestedFiles = true // ***************************************** // This is where we'd see exceptions. // ***************************************** override def open(fileSplit: FileInputSplit): Unit = { super.open(fileSplit) val config = new Configuration() hadoopStream = WrappedHadoopInputStream.wrap(stream) sequenceFileReader = new SequenceFile.Reader(config, SequenceFile.Reader.stream(hadoopStream)) bufferNextRecord() } ... } // AND class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream) extends InputStream with Seekable with PositionedReadable { def read(): Int = underlying.read() def seek(pos: Long): Unit = underlying.seek(pos) def getPos: Long = underlying.getPos } ... ``` Thanks for all your help, I appreciate it! I wouldn't have been able to debug and resolve this if it wasn't for you filing the ticket. Thank you so much! [0] https://github.com/pantsbuild/jarjar Aaron Levin On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi <u...@apache.org> wrote: > Hey Aaron, > > sorry for the late reply (again). > > (1) I think that your final result is in line with what I have > reproduced in https://issues.apache.org/jira/browse/FLINK-11402. > > (2) I think renaming the file would not help as it will still be > loaded multiple times when the jobs restarts (as it happens in > FLINK-11402). > > (3) I'll try to check whether Flink's shading of Hadoop is related to > this. I don't think so though. @Chesnay (cc'd): What do you think? > > (4) @Aaron: Can you tell me which Hadoop libraries you use and share > some code so I can try to reproduce this exactly on my side? Judging > from the earlier stack traces you have shared, I'm assuming you are > trying to read Snappy-compressed sequence files. > > – Ufuk > > On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin <aaronle...@stripe.com> wrote: > > > > I don't control the code calling `System.loadLibrary("hadoop")` so > that's not an option for me, unfortunately. > > > > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma <guowei....@gmail.com> wrote: > >> > >> This may be caused by a jvm process can only load a so once.So a triky > way is to rename it。 > >> > >> 发自我的 iPhone > >> > >> 在 2019年1月25日,上午7:12,Aaron Levin <aaronle...@stripe.com> 写道: > >> > >> Hi Ufuk, > >> > >> Update: I've pinned down the issue. It's multiple classloaders loading > `libhadoop.so`: > >> > >> ``` > >> failed to load native hadoop with error: > java.lang.UnsatisfiedLinkError: Native Library /usr/lib/libhadoop.so > already loaded in another classloader > >> ``` > >> > >> I'm not quite sure what the solution is. Ideally flink would destroy a > classloader when a job is canceled, but perhaps there's a jvm limitation > there? Putting the libraries into `/usr/lib` or `/lib` does not work (as > suggested by Chesnay in the ticket) as I get the same error. I might see if > I can put a jar with `org.apache.hadoop.common.io.compress` in > `/flink/install/lib` and then remove it from my jar. It's not an ideal > solution but I can't think of anything else. > >> > >> Best, > >> > >> Aaron Levin > >> > >> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <aaronle...@stripe.com> > wrote: > >>> > >>> Hi Ufuk, > >>> > >>> I'm starting to believe the bug is much deeper than the originally > reported error because putting the libraries in `/usr/lib` or `/lib` does > not work. This morning I dug into why putting `libhadoop.so` into > `/usr/lib` didn't work, despite that being in the `java.library.path` at > the call site of the error. I wrote a small program to test the loading of > native libraries, and it was able to successfully load `libhadoop.so`. I'm > very perplexed. Could this be related to the way flink shades hadoop stuff? > >>> > >>> Here is my program and its output: > >>> > >>> ``` > >>> $ cat LibTest.scala > >>> package com.redacted.flink > >>> > >>> object LibTest { > >>> def main(args: Array[String]): Unit = { > >>> val library = args(0) > >>> > > System.out.println(s"java.library.path=${System.getProperty("java.library.path")}") > >>> System.out.println(s"Attempting to load $library") > >>> System.out.flush() > >>> System.loadLibrary(library) > >>> System.out.println(s"Successfully loaded ") > >>> System.out.flush() > >>> } > >>> ``` > >>> > >>> I then tried running that on one of the task managers with `hadoop` as > an argument: > >>> > >>> ``` > >>> $ java -jar lib_test_deploy.jar hadoop > >>> > java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib > >>> Attempting to load hadoop > >>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop > in java.library.path > >>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) > >>> at java.lang.Runtime.loadLibrary0(Runtime.java:870) > >>> at java.lang.System.loadLibrary(System.java:1122) > >>> at com.stripe.flink.LibTest$.main(LibTest.scala:11) > >>> at com.stripe.flink.LibTest.main(LibTest.scala) > >>> ``` > >>> > >>> I then copied the native libraries into `/usr/lib/` and ran it again: > >>> > >>> ``` > >>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/ > >>> $ java -jar lib_test_deploy.jar hadoop > >>> > java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib > >>> Attempting to load hadoop > >>> Successfully loaded > >>> ``` > >>> > >>> Any ideas? > >>> > >>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <aaronle...@stripe.com> > wrote: > >>>> > >>>> Hi Ufuk, > >>>> > >>>> One more update: I tried copying all the hadoop native `.so` files > (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I > reported. I also tried naively adding the `.so` files to the jar with the > flink application and am still experiencing the issue I reported (however, > I'm going to investigate this further as I might not have done it > correctly). > >>>> > >>>> Best, > >>>> > >>>> Aaron Levin > >>>> > >>>> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <aaronle...@stripe.com> > wrote: > >>>>> > >>>>> Hi Ufuk, > >>>>> > >>>>> Two updates: > >>>>> > >>>>> 1. As suggested in the ticket, I naively copied the every `.so` in > `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My > knowledge of how shared libs get picked up is hazy, so I'm not sure if > blindly copying them like that should work. I did check what > `System.getProperty("java.library.path")` returns at the call-site and > it's: > java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib > >>>>> 2. The exception I see comes from > `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). > This uses `System.loadLibrary("hadoop")`. > >>>>> > >>>>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: > org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z > >>>>> [2019-01-23 19:52:33.081376] at > org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method) > >>>>> [2019-01-23 19:52:33.081406] at > org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63) > >>>>> [2019-01-23 19:52:33.081429] at > org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195) > >>>>> [2019-01-23 19:52:33.081457] at > org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181) > >>>>> [2019-01-23 19:52:33.081494] at org.apache.hadoop.io > .SequenceFile$Reader.init(SequenceFile.java:2037) > >>>>> [2019-01-23 19:52:33.081517] at org.apache.hadoop.io > .SequenceFile$Reader.initialize(SequenceFile.java:1923) > >>>>> [2019-01-23 19:52:33.081549] at org.apache.hadoop.io > .SequenceFile$Reader.<init>(SequenceFile.java:1872) > >>>>> ... (redacted) ... > >>>>> [2019-01-23 19:52:33.081728] at > scala.collection.immutable.List.foreach(List.scala:392) > >>>>> ... (redacted) ... > >>>>> [2019-01-23 19:52:33.081832] at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) > >>>>> [2019-01-23 19:52:33.081854] at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > >>>>> [2019-01-23 19:52:33.081882] at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > >>>>> [2019-01-23 19:52:33.081904] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > >>>>> [2019-01-23 19:52:33.081946] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > >>>>> [2019-01-23 19:52:33.081967] at > java.lang.Thread.run(Thread.java:748) > >>>>> > >>>>> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <aaronle...@stripe.com> > wrote: > >>>>>> > >>>>>> Hey Ufuk, > >>>>>> > >>>>>> So, I looked into this a little bit: > >>>>>> > >>>>>> 1. clarification: my issues are with the hadoop-related snappy > libraries and not libsnappy itself (this is my bad for not being clearer, > sorry!). I already have `libsnappy` on my classpath, but I am looking into > including the hadoop snappy libraries. > >>>>>> 2. exception: I don't see the class loading error. I'm going to try > to put some more instrumentation and see if I can get a clearer stacktrace > (right now I get an NPE on closing a sequence file in a finalizer - when I > last logged the exception it was something deep in hadoop's snappy libs - > I'll get clarification soon). > >>>>>> 3. I'm looking into including hadoop's snappy libs in my jar and > we'll see if that resolves the problem. > >>>>>> > >>>>>> Thanks again for your help! > >>>>>> > >>>>>> Best, > >>>>>> > >>>>>> Aaron Levin > >>>>>> > >>>>>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <aaronle...@stripe.com> > wrote: > >>>>>>> > >>>>>>> Hey, > >>>>>>> > >>>>>>> Thanks so much for the help! This is awesome. I'll start looking > into all of this right away and report back. > >>>>>>> > >>>>>>> Best, > >>>>>>> > >>>>>>> Aaron Levin > >>>>>>> > >>>>>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <u...@apache.org> > wrote: > >>>>>>>> > >>>>>>>> Hey Aaron, > >>>>>>>> > >>>>>>>> sorry for the late reply. > >>>>>>>> > >>>>>>>> (1) I think I was able to reproduce this issue using snappy-java. > I've > >>>>>>>> filed a ticket here: > >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check > the > >>>>>>>> ticket description whether it's in line with what you are > >>>>>>>> experiencing? Most importantly, do you see the same Exception > being > >>>>>>>> reported after cancelling and re-starting the job? > >>>>>>>> > >>>>>>>> (2) I don't think it's caused by the environment options not being > >>>>>>>> picked up. You can check the head of the log files of the > JobManager > >>>>>>>> or TaskManager to verify that your provided option is picked up as > >>>>>>>> expected. You should see something similar to this: > >>>>>>>> > >>>>>>>> 2019-01-21 22:53:49,863 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> > -------------------------------------------------------------------------------- > >>>>>>>> 2019-01-21 22:53:49,864 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0, > >>>>>>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC) > >>>>>>>> ... > >>>>>>>> 2019-01-21 22:53:49,865 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > JVM > >>>>>>>> Options: > >>>>>>>> 2019-01-21 22:53:49,865 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> -Xms1024m > >>>>>>>> 2019-01-21 22:53:49,865 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> -Xmx1024m > >>>>>>>> You are looking for this line ----> 2019-01-21 22:53:49,865 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ > <---- > >>>>>>>> 2019-01-21 22:53:49,865 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> > -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log > >>>>>>>> ... > >>>>>>>> 2019-01-21 22:53:49,866 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> Program Arguments: > >>>>>>>> 2019-01-21 22:53:49,866 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> --configDir > >>>>>>>> 2019-01-21 22:53:49,866 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> /.../flink-1.7.0/conf > >>>>>>>> 2019-01-21 22:53:49,866 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> --executionMode > >>>>>>>> 2019-01-21 22:53:49,866 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> cluster > >>>>>>>> ... > >>>>>>>> 2019-01-21 22:53:49,866 INFO > >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > >>>>>>>> > -------------------------------------------------------------------------------- > >>>>>>>> > >>>>>>>> Can you verify that you see the log messages as expected? > >>>>>>>> > >>>>>>>> (3) As noted FLINK-11402, is it possible to package the snappy > library > >>>>>>>> as part of your user code instead of loading the library via > >>>>>>>> java.library.path? In my example, that seems to work fine. > >>>>>>>> > >>>>>>>> – Ufuk > >>>>>>>> > >>>>>>>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin < > aaronle...@stripe.com> wrote: > >>>>>>>> > > >>>>>>>> > Hello! > >>>>>>>> > > >>>>>>>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact > when a job is canceled or fails and then is restarted (with or without > savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` > seem to start having impact again and our job will run without failure. > More below. > >>>>>>>> > > >>>>>>>> > We use consume Snappy-compressed sequence files in our flink > job. This requires access to the hadoop native libraries. In our > `flink-conf.yaml` for both the task manager and the job manager, we put: > >>>>>>>> > > >>>>>>>> > ``` > >>>>>>>> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native > >>>>>>>> > ``` > >>>>>>>> > > >>>>>>>> > If I launch our job on freshly-restarted task managers, the job > operates fine. If at some point I cancel the job or if the job restarts for > some other reason, the job will begin to crashloop because it tries to open > a Snappy-compressed file but doesn't have access to the codec from the > native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then > restart the task manager while the job is crashlooping, the job is start > running without any codec failures. > >>>>>>>> > > >>>>>>>> > The only reason I can conjure that would cause the Snappy > compression to fail is if the `env.java.opts` were not being passed through > to the job on restart for some reason. > >>>>>>>> > > >>>>>>>> > Does anyone know what's going on? Am I missing some additional > configuration? I really appreciate any help! > >>>>>>>> > > >>>>>>>> > About our setup: > >>>>>>>> > > >>>>>>>> > - Flink Version: 1.7.0 > >>>>>>>> > - Deployment: Standalone in HA > >>>>>>>> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use > Flink’s shaded jars to access our files in S3. We do not use the > `bundled-with-hadoop` distribution of Flink. > >>>>>>>> > > >>>>>>>> > Best, > >>>>>>>> > > >>>>>>>> > Aaron Levin >