Hi.

I found that the problem is that i didn't have
flink-s3-fs-hadoop-<version>.jar in flink lib directory, with that i can
use 's3a' protocol.

On Tue, Jun 11, 2019 at 4:48 PM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> The code in HadoopRecoverableWriter is:
>
> if (!"hdfs".equalsIgnoreCase(fs.getScheme()) ||
> !HadoopUtils.isMinHadoopVersion(2, 7)) {
> throw new UnsupportedOperationException(
> "Recoverable writers on Hadoop are only supported for HDFS and for Hadoop
> version 2.7 or newer");
> }
>
> So one possibility is that your sink path doesn’t have the explicit
> hdfs://xxx protocol.
>
> Another is that you’re in classpath hell, and your job jar contains an
> older version of Hadoop jars.
>
> — Ken
>
>
> On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <
> yitzch...@sentinelone.com> wrote:
>
> Hi.
>
> I'm a bit confused:
> When launching my flink streaming application on EMR release 5.24 (which
> have flink 1.8 version) that write Kafka messages to s3 parquet files i'm
> getting the exception below, but when i'm installing flink 1.8 on EMR
> custom wise it works.
> What could be the difference behavior?
>
> Thanks,
> Yitzchak.
>
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
> at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to