使用FileInputFormat
递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。
代码:
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new
Path("hdfs://arc/success_fid_flow"));
fileInputFormat.setNestedFileEnumeration(true);
//过滤掉条件为true
fileInputFormat.setFilesFilter(new
RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
DataSet<String> source =env.createInput(fileInputFormat);
source.output(new HdfsTrainSinktest());
打印的日志:
2020-06-01 14:43:41,848 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:41,848 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:41,848 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:41,848 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:41,849 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:41,849 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2020-06-01 14:43:41,849 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:41,849 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:41,849 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: historyserver.web.port, 8082
2020-06-01 14:43:41,890 INFO org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:41,993 INFO org.apache.flink.optimizer.Optimizer
- Compiler could not determine the size of input 'TextInputFormat
([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:42,022 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:42,022 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:42,022 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:42,022 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2020-06-01 14:43:42,023 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:42,023 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.archive.fs.dir,
hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:42,023 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: historyserver.web.port, 8082
2020-06-01 14:43:42,023 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: historyserver.archive.fs.dir,
hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO
org.apache.flink.configuration.GlobalConfiguration - Loading
configuration property: historyserver.archive.fs.refresh-interval, 10000
2020-06-01 14:43:42,069 INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting job
410508f08b0775c0529e84b221dd909d (detached: false).
2020-06-01 14:43:52,134 INFO org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:52,167 INFO org.apache.flink.optimizer.Optimizer
-Compiler could not determine the size of input 'TextInputFormat
([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:52,171 INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting job
10ac036dbb1d546996f2db76a2901253 (detached: false).
2020-06-01 14:43:55,649 INFO org.apache.flink.client.cli.CliFrontend
- Program execution finished
Program execution finished
Job with JobID 10ac036dbb1d546996f2db76a2901253 has finished.
Job Runtime: 1143 ms
2020-06-01 14:43:55,663 INFO org.apache.flink.runtime.rest.RestClient
- Shutting down rest endpoint.
2020-06-01 14:43:55,665 INFO org.apache.flink.runtime.rest.RestClient
- Rest endpoint shutdown complete.
| |
王志华
|
|
[email protected]
|
签名由网易邮箱大师定制