The code in HadoopRecoverableWriter is:

                if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || 
!HadoopUtils.isMinHadoopVersion(2, 7)) {
                        throw new UnsupportedOperationException(
                                        "Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer");
                }

So one possibility is that your sink path doesn’t have the explicit hdfs://xxx 
protocol. 

Another is that you’re in classpath hell, and your job jar contains an older 
version of Hadoop jars.

— Ken


> On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <yitzch...@sentinelone.com> 
> wrote:
> 
> Hi.
> 
> I'm a bit confused:
> When launching my flink streaming application on EMR release 5.24 (which have 
> flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting 
> the exception below, but when i'm installing flink 1.8 on EMR custom wise it 
> works.
> What could be the difference behavior?
> 
> Thanks,
> Yitzchak.
> 
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on 
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
>       at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to