Hey guys,
I've setup a BucketingSink as a dead letter queue into our Ceph
cluster using S3, but when I start the job, I get this error:
java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
at
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 17 more
I find it weird 'cause I've already set up checkpoints (and savepoitns) to
use S3 as protocol, and I just assume that, if it works for checkpoints, it
should work here.
(I suppose I could add the aws client as a dependency of my build but,
again, I assumed that once S3 works for checkpoints, it should work
everywhere.)
And kinda related, can I assume that using the FileSystem class to create
FSOutputStreams will follow Flink configuration? I have another type of
dead letter queue that won't work with BucketingSink and I was thinking
about using it directly to create files inside that Ceph/S3.
--
*Julio Biason*, Sofware Engineer
*AZION* | Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51
<callto:+5551996209291>*99907 0554*