Hi. I found that the problem is that i didn't have flink-s3-fs-hadoop-<version>.jar in flink lib directory, with that i can use 's3a' protocol.
On Tue, Jun 11, 2019 at 4:48 PM Ken Krugler <kkrugler_li...@transpac.com> wrote: > The code in HadoopRecoverableWriter is: > > if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || > !HadoopUtils.isMinHadoopVersion(2, 7)) { > throw new UnsupportedOperationException( > "Recoverable writers on Hadoop are only supported for HDFS and for Hadoop > version 2.7 or newer"); > } > > So one possibility is that your sink path doesn’t have the explicit > hdfs://xxx protocol. > > Another is that you’re in classpath hell, and your job jar contains an > older version of Hadoop jars. > > — Ken > > > On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman < > yitzch...@sentinelone.com> wrote: > > Hi. > > I'm a bit confused: > When launching my flink streaming application on EMR release 5.24 (which > have flink 1.8 version) that write Kafka messages to s3 parquet files i'm > getting the exception below, but when i'm installing flink 1.8 on EMR > custom wise it works. > What could be the difference behavior? > > Thanks, > Yitzchak. > > Caused by: java.lang.UnsupportedOperationException: Recoverable writers on > Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > >