public static void main(String[] args) throws Exception {
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Job job = Job.getInstance();
//自定义input读取hdfs
HadoopInputFormat<LongWritable, Text> hadoopIF = new
HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job);
//过滤需要读取的子目录
ArrayList<Path> inputhPaths =
HadoopUtil.getHdfsFileName("hdfs://arc_c/fid_flow/*","2020-05-10","2020-05-20");
TextInputFormat.setInputPaths(job, (Path[]) inputhPaths.toArray(new
Path[inputhPaths.size()]));
//自定义input的方式读取hdfs
DataSet<Tuple2<LongWritable, Text>> source = env.createInput(hadoopIF);
source.output(new HdfsTrainSinktest());
env.execute("offline_train");
}
通过这种方式本地可以读取,提交到yarn上会报如下错误
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
initialize task 'DataSource (at createInput(ExecutionEnvironment.java:552)
(org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat))': Loading the
input/output formats failed:
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:218)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:106)
at
org.apache.flink.runtime.scheduler.LegacyScheduler.createExecutionGraph(LegacyScheduler.java:207)
at
org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:184)
at
org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176)
at
org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275)
at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
... 10 more
Caused by: java.lang.Exception: Loading the input/output formats failed:
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat@458544e0
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:214)
... 20 more
Caused by: java.lang.RuntimeException: Deserializing the input/output formats
failed: unread block data
at
org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
... 22 more
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2783)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1605)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at
org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
... 23 more
End of exception on server side>]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
| |
王志华
|
|
[email protected]
|
签名由网易邮箱大师定制
在2020年05月21日 12:24,Jingsong Li<[email protected]> 写道:
Hi,
志华,
如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。
jimandlice,
-
如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
- 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html
Best,
Jingsong Lee
On Thu, May 21, 2020 at 10:59 AM jimandlice <[email protected]> wrote:
flink 写入hive 使用api 思路是怎么的呢
| |
jimandlice
|
|
邮箱:[email protected]
|
Signature is customized by Netease Mail Master
在2020年05月21日 10:57,阿华田 写道:
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
| |
王志华
|
|
[email protected]
|
签名由网易邮箱大师定制
--
Best, Jingsong Lee