The code in HadoopRecoverableWriter is: if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) { throw new UnsupportedOperationException( "Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer"); }
So one possibility is that your sink path doesn’t have the explicit hdfs://xxx protocol. Another is that you’re in classpath hell, and your job jar contains an older version of Hadoop jars. — Ken > On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <yitzch...@sentinelone.com> > wrote: > > Hi. > > I'm a bit confused: > When launching my flink streaming application on EMR release 5.24 (which have > flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting > the exception below, but when i'm installing flink 1.8 on EMR custom wise it > works. > What could be the difference behavior? > > Thanks, > Yitzchak. > > Caused by: java.lang.UnsupportedOperationException: Recoverable writers on > Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer > at > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra