import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
DateTimeBucketer}sink.setBucketer sink.setWriter用这种方式试试 赵一旦 <[email protected]> 于2021年1月21日周四 下午6:37写道: > @Michael Ran > 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。 > > Michael Ran <[email protected]> 于2021年1月21日周四 下午5:23写道: > > > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public > > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...} > > 在 2021-01-21 17:18:23,"赵一旦" <[email protected]> 写道: > > >具体报错信息如下: > > > > > >java.lang.UnsupportedOperationException: Recoverable writers on Hadoop > are > > >only supported for HDFS > > > at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>( > > >HadoopRecoverableWriter.java:61) > > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem > > >.createRecoverableWriter(HadoopFileSystem.java:210) > > > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem > > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) > > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink > > >.java:260) > > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > > > > >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270) > > > at org.apache.flink.streaming.api.functions.sink.filesystem. > > >StreamingFileSink.initializeState(StreamingFileSink.java:412) > > > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >.tryRestoreFunction(StreamingFunctionUtils.java:185) > > > at org.apache.flink.streaming.util.functions.StreamingFunctionUtils > > >.restoreFunctionState(StreamingFunctionUtils.java:167) > > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator > > >.initializeState(AbstractUdfStreamOperator.java:96) > > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > > >.initializeOperatorState(StreamOperatorStateHandler.java:107) > > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > > >.initializeState(AbstractStreamOperator.java:264) > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain > > >.initializeStateAndOpenOperators(OperatorChain.java:400) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask > > >.lambda$beforeInvoke$2(StreamTask.java:507) > > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > > >.runThrowing(StreamTaskActionExecutor.java:47) > > > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > > >StreamTask.java:501) > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > > >.java:531) > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > >赵一旦 <[email protected]> 于2021年1月21日周四 下午5:17写道: > > > > > >> Recoverable writers on Hadoop are only supported for HDFS > > >> > > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。 > > >> > > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。 > > >> > > >> > > >> > > >
