import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
DateTimeBucketer}

sink.setBucketer sink.setWriter用这种方式试试



赵一旦 <[email protected]> 于2021年1月21日周四 下午6:37写道:

> @Michael Ran
> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
>
> Michael Ran <[email protected]> 于2021年1月21日周四 下午5:23写道:
>
> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> > 在 2021-01-21 17:18:23,"赵一旦" <[email protected]> 写道:
> > >具体报错信息如下:
> > >
> > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are
> > >only supported for HDFS
> > >    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
> > >HadoopRecoverableWriter.java:61)
> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> > >.java:260)
> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >
> >
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> > >    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> > >    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> > >    at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> > >.initializeState(AbstractUdfStreamOperator.java:96)
> > >    at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> > >    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> > >.initializeState(AbstractStreamOperator.java:264)
> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> > >    at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> > >.runThrowing(StreamTaskActionExecutor.java:47)
> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> > >StreamTask.java:501)
> > >    at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> > >.java:531)
> > >    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> > >    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> > >    at java.lang.Thread.run(Thread.java:748)
> > >
> > >
> > >赵一旦 <[email protected]> 于2021年1月21日周四 下午5:17写道:
> > >
> > >> Recoverable writers on Hadoop are only supported for HDFS
> > >>
> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> > >>
> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> > >>
> > >>
> > >>
> >
>

回复