Hi Conrad, I'm afraid you're running in the same problem that we already encountered with loading the native RocksDB library: https://github.com/apache/flink/blob/219ae33d36e67e3e74f493cf4956a290bc966a5d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L519 <https://github.com/apache/flink/blob/219ae33d36e67e3e74f493cf4956a290bc966a5d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L519>
The code section has a good description of what is going on. We're using NativeLibraryLoader [1], which comes with RocksDB to try and load the native library from a different temporary location if loading it the normal way fails. (The relocation of the lib to a temp dir is in the NativeLibraryLoader, not on our side. We're just providing a temp path for NativeLibraryLoader to work with.) Hope that helps, Aljoscha [1] https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/NativeLibraryLoader.java <https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/NativeLibraryLoader.java> > On 10. Aug 2017, at 15:36, Conrad Crampton <conrad.cramp...@secdata.com> > wrote: > > Hi, > First time posting here so ‘hi’. > I have been using Flink (1.31 now) for a couple of months now and loving it. > My deployment is to JobManager running as a long running session on Yarn. > I have a problem where I have a Flink streaming job that involves loading > native libraries as part of one of the mappers (inheriting from > RichMapFunction) loading (in the open method) a previously trained machine > learning model (using Deeplearning4j). The problem lies with when loading the > model, it also loads some native libraries using javacpp Loader class (which > from looking at the source code determines a location for native libraries > and from a hierarchy of availability of a System property, the users home dir > (with .javacpp) or temp dir). > Anyway the actual problem lies is if an exception is thrown in the Flink job, > the jobmanager tries to restart it, however it would appear that when it > failed in the first place, references to the objects and therefore the > classes aren’t released by the classloader as I get an error > > java.lang.ExceptionInInitializerError > at > org.nd4j.linalg.cpu.nativecpu.ops.NativeOpExecutioner.<init>(NativeOpExecutioner.java:43) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at java.lang.Class.newInstance(Class.java:442) > at org.nd4j.linalg.factory.Nd4j.initWithBackend(Nd4j.java:5784) > at org.nd4j.linalg.factory.Nd4j.initContext(Nd4j.java:5694) > at org.nd4j.linalg.factory.Nd4j.<clinit>(Nd4j.java:184) > at > org.deeplearning4j.util.ModelSerializer.restoreMultiLayerNetwork(ModelSerializer.java:188) > at > org.deeplearning4j.util.ModelSerializer.restoreMultiLayerNetwork(ModelSerializer.java:138) > at > com.secdata.gi.detect.map.DGAClassifierMapFunction.open(DGAClassifierMapFunction.java:54) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: ND4J is probably missing dependencies. > For more information, please refer to: http://nd4j.org/getstarted.html > <http://nd4j.org/getstarted.html> > at org.nd4j.nativeblas.NativeOpsHolder.<init>(NativeOpsHolder.java:51) > at > org.nd4j.nativeblas.NativeOpsHolder.<clinit>(NativeOpsHolder.java:19) > ... 18 more > Caused by: java.lang.UnsatisfiedLinkError: no jnind4jcpu in java.library.path > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1865) > at java.lang.Runtime.loadLibrary0(Runtime.java:870) > at java.lang.System.loadLibrary(System.java:1122) > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:963) > at org.bytedeco.javacpp.Loader.load(Loader.java:764) > at org.bytedeco.javacpp.Loader.load(Loader.java:671) > at org.nd4j.nativeblas.Nd4jCpu.<clinit>(Nd4jCpu.java:10) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at org.bytedeco.javacpp.Loader.load(Loader.java:726) > at org.bytedeco.javacpp.Loader.load(Loader.java:671) > at org.nd4j.nativeblas.Nd4jCpu$NativeOps.<clinit>(Nd4jCpu.java:62) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:264) > at org.nd4j.nativeblas.NativeOpsHolder.<init>(NativeOpsHolder.java:29) > ... 19 more > Caused by: java.lang.UnsatisfiedLinkError: Native Library > /home/yarn/.javacpp/cache/blob_a87e49f9a475a9dc4296f6afbc3ae171dc821d19/org/nd4j/nativeblas/linux-x86_64/libjnind4jcpu.so > already loaded in another classloader > at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1903) > at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1822) > at java.lang.Runtime.load0(Runtime.java:809) > at java.lang.System.load(System.java:1086) > at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:943) > > I’m no expert in class loading at all but have tried a number of ways to > overcome this based on the docs and dynamic classloading > > 1. Putting the jars that contain the native libraries in the flink/lib > directory so they are always loaded on JobManager startup and removing them > from the deployed flink job > 2. Some code in the open / close methods of my mapper to try and > dispose of the native library > NativeLibrary lib = NativeLibrary.getInstance(Loader.getCacheDir() + > "/cache/nd4j-native-0.8.0-" + Loader.getPlatform() + > ".jar/org/nd4j/nativeblas/" + Loader.getPlatform() + "/libjnind4jcpu.so"); > this.model = null; > > if (null != lib) { > System.out.println("found lib - removing"); > lib.dispose(); > System.gc(); > } > This sort of works in finding the library (on my mac running locally) but as > from the stacktrace above, on deployment to yarn some random value is placed > after the /cache dir which I don’t know how to get a handle on to be able to > construc the correct library location (and just using the library name as the > jni docs suggest for NativeLibrary.getInstance fails to find the library) > > Neither of the above approaches work, so when my job fails I can’t cancel and > resubmit as it just fails with the same stack trace. The only way I can get > it to run again is the cancel all other jobs running on JobManager, killing > the JM yarn session, creating a new Yarn session JM, resubmitting flink job – > which is a real pain. Ideally I would like to stop the exception in the first > place, but as I can’t figure out how to get logging appearing in my yarn logs > either (for debug) I’m at a bit of a loss! > > Any pointers, suggestions please?? > > Many thanks > Conrad > > > > SecureData, combating cyber threats > > The information contained in this message or any of its attachments may be > privileged and confidential and intended for the exclusive use of the > intended recipient. If you are not the intended recipient any disclosure, > reproduction, distribution or other dissemination or use of this > communications is strictly prohibited. The views expressed in this email are > those of the individual and not necessarily of SecureData Europe Ltd. Any > prices quoted are only valid if followed up by a formal written quote. > > SecureData Europe Limited. Registered in England & Wales 04365896. Registered > Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, > ME16 9NT >