使用FileInputFormat 
递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。
代码:
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new 
Path("hdfs://arc/success_fid_flow"));
fileInputFormat.setNestedFileEnumeration(true);
//过滤掉条件为true
fileInputFormat.setFilesFilter(new 
RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
DataSet<String> source =env.createInput(fileInputFormat);
source.output(new HdfsTrainSinktest());
打印的日志:


    2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: parallelism.default, 1
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: historyserver.web.port, 8082
2020-06-01 14:43:41,890 INFO  org.apache.flink.api.java.ExecutionEnvironment    
            - The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:41,993 INFO  org.apache.flink.optimizer.Optimizer              
            - Compiler could not determine the size of input 'TextInputFormat 
([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: parallelism.default, 1
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.archive.fs.dir, 
hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: historyserver.web.port, 8082
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: historyserver.archive.fs.dir, 
hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: historyserver.archive.fs.refresh-interval, 10000
2020-06-01 14:43:42,069 INFO  
org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 
410508f08b0775c0529e84b221dd909d (detached: false).
2020-06-01 14:43:52,134 INFO  org.apache.flink.api.java.ExecutionEnvironment    
            - The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:52,167 INFO  org.apache.flink.optimizer.Optimizer              
            -Compiler could not determine the size of input 'TextInputFormat 
([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:52,171 INFO  
org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 
10ac036dbb1d546996f2db76a2901253 (detached: false).
2020-06-01 14:43:55,649 INFO  org.apache.flink.client.cli.CliFrontend           
            - Program execution finished
Program execution finished
Job with JobID 10ac036dbb1d546996f2db76a2901253 has finished.
Job Runtime: 1143 ms
2020-06-01 14:43:55,663 INFO  org.apache.flink.runtime.rest.RestClient          
            - Shutting down rest endpoint.
2020-06-01 14:43:55,665 INFO  org.apache.flink.runtime.rest.RestClient          
            - Rest endpoint shutdown complete.
| |
王志华
|
|
[email protected]
|
签名由网易邮箱大师定制

回复