Hi,
First time posting here so ‘hi’.
I have been using Flink (1.31 now) for a couple of months now and loving it. My 
deployment is to JobManager running as a long running session on Yarn.
I have a problem where I have a Flink streaming job that involves loading 
native libraries as part of one of the mappers (inheriting from 
RichMapFunction) loading (in the open method) a previously trained machine 
learning model (using Deeplearning4j). The problem lies with when loading the 
model, it also loads some native libraries using javacpp Loader class (which 
from looking at the source code determines a location for native libraries and 
from a hierarchy of availability of a System property, the users home dir (with 
.javacpp) or temp dir).
Anyway the actual problem lies is if an exception is thrown in the Flink job, 
the jobmanager tries to restart it, however it would appear that when it failed 
in the first place, references to the objects and therefore the classes aren’t 
released by the classloader as I get an error

java.lang.ExceptionInInitializerError
        at 
org.nd4j.linalg.cpu.nativecpu.ops.NativeOpExecutioner.<init>(NativeOpExecutioner.java:43)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at java.lang.Class.newInstance(Class.java:442)
        at org.nd4j.linalg.factory.Nd4j.initWithBackend(Nd4j.java:5784)
        at org.nd4j.linalg.factory.Nd4j.initContext(Nd4j.java:5694)
        at org.nd4j.linalg.factory.Nd4j.<clinit>(Nd4j.java:184)
        at 
org.deeplearning4j.util.ModelSerializer.restoreMultiLayerNetwork(ModelSerializer.java:188)
        at 
org.deeplearning4j.util.ModelSerializer.restoreMultiLayerNetwork(ModelSerializer.java:138)
        at 
com.secdata.gi.detect.map.DGAClassifierMapFunction.open(DGAClassifierMapFunction.java:54)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: ND4J is probably missing dependencies. 
For more information, please refer to: http://nd4j.org/getstarted.html
        at org.nd4j.nativeblas.NativeOpsHolder.<init>(NativeOpsHolder.java:51)
        at org.nd4j.nativeblas.NativeOpsHolder.<clinit>(NativeOpsHolder.java:19)
        ... 18 more
Caused by: java.lang.UnsatisfiedLinkError: no jnind4jcpu in java.library.path
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1865)
        at java.lang.Runtime.loadLibrary0(Runtime.java:870)
        at java.lang.System.loadLibrary(System.java:1122)
        at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:963)
        at org.bytedeco.javacpp.Loader.load(Loader.java:764)
        at org.bytedeco.javacpp.Loader.load(Loader.java:671)
        at org.nd4j.nativeblas.Nd4jCpu.<clinit>(Nd4jCpu.java:10)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.bytedeco.javacpp.Loader.load(Loader.java:726)
        at org.bytedeco.javacpp.Loader.load(Loader.java:671)
        at org.nd4j.nativeblas.Nd4jCpu$NativeOps.<clinit>(Nd4jCpu.java:62)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.nd4j.nativeblas.NativeOpsHolder.<init>(NativeOpsHolder.java:29)
        ... 19 more
Caused by: java.lang.UnsatisfiedLinkError: Native Library 
/home/yarn/.javacpp/cache/blob_a87e49f9a475a9dc4296f6afbc3ae171dc821d19/org/nd4j/nativeblas/linux-x86_64/libjnind4jcpu.so
 already loaded in another classloader
        at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1903)
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1822)
        at java.lang.Runtime.load0(Runtime.java:809)
        at java.lang.System.load(System.java:1086)
        at org.bytedeco.javacpp.Loader.loadLibrary(Loader.java:943)

I’m no expert in class loading at all but have tried a number of ways to 
overcome this based on the docs and dynamic classloading


1.       Putting the jars that contain the native libraries in the flink/lib 
directory so they are always loaded on JobManager startup and removing them 
from the deployed flink job

2.       Some code in the open / close methods of my mapper to try and dispose 
of the native library
NativeLibrary lib = NativeLibrary.getInstance(Loader.getCacheDir() + 
"/cache/nd4j-native-0.8.0-" + Loader.getPlatform() +  
".jar/org/nd4j/nativeblas/" + Loader.getPlatform() + "/libjnind4jcpu.so");
this.model = null;

if (null != lib) {
    System.out.println("found lib - removing");
    lib.dispose();
    System.gc();
}
This sort of works in finding the library (on my mac running locally) but as 
from the stacktrace above, on deployment to yarn some random value is placed 
after the /cache dir which I don’t know how to get a handle on to be able to 
construc the correct library location (and just using the library name as the 
jni docs suggest for NativeLibrary.getInstance fails to find the library)



Neither of the above approaches work, so when my job fails I can’t cancel and 
resubmit as it just fails with the same stack trace. The only way I can get it 
to run again is the cancel all other jobs running on JobManager, killing the JM 
yarn session, creating a new Yarn session JM, resubmitting flink job – which is 
a real pain. Ideally I would like to stop the exception in the first place, but 
as I can’t figure out how to get logging appearing in my yarn logs either (for 
debug) I’m at a bit of a loss!



Any pointers, suggestions please??



Many thanks

Conrad




SecureData, combating cyber threats
______________________________________________________________________ 
The information contained in this message or any of its attachments may be 
privileged and confidential and intended for the exclusive use of the intended 
recipient. If you are not the intended recipient any disclosure, reproduction, 
distribution or other dissemination or use of this communications is strictly 
prohibited. The views expressed in this email are those of the individual and 
not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if 
followed up by a formal written quote.

SecureData Europe Limited. Registered in England & Wales 04365896. Registered 
Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, 
ME16 9NT

Reply via email to