Re: Flink Read thousands of files with batch

2019-11-12 Thread Dominik Wosiński
I have managed to locate the issue with timeout, changing `web.timeout` was the solution. However, now I am getting the following error : 019-11-12 16:58:00,741 INFO org.apache.parquet.hadoop.ParquetInputFormat - Total input paths to process : 671 2019-11-12 16:58:04,878 INFO

Re: Flink Read thousands of files with batch

2019-11-12 Thread Dominik Wosiński
Hey Jingsong, I will try to use the patch to verify. In the meantime, I have run the job with -D akka.ask.timeout and -D akka.client.timeout, both equal to 600s. But the stacktrace is the same : org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.

Re: Flink Read thousands of files with batch

2019-11-12 Thread Jingsong Li
Hi Dominik: I found a problem too that it maybe your root cause.[1] JobConf in HadoopInputSplit may very big, contains hundreds of configurations, if it is serialized by every split, that will significantly reduce performance. Consider thousands of splits, the akka thread of JobMaster will all on

Re: Flink Read thousands of files with batch

2019-11-12 Thread Dominik Wosiński
Hey, I have increased the `akka.client.timeout` but it has not helped at all. Here is the log with callstack for AskTimeoutException: 019-11-12 10:19:17,425 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 81fbbc3f41ad5e08ac832d0e656478bc (Flink

Re: Flink Read thousands of files with batch

2019-11-11 Thread tison
I suspect you suffer from Client submission failure which also throws AskTimeoutException. The related configure option are `akka.client.timeout` which you can increase. However, there was some cases you can resolve the problem by upgrading Java to latest minimum version 8u212 Best, tison. Zhu

Re: Flink Read thousands of files with batch

2019-11-11 Thread Zhu Zhu
Hi Dominik, Would you check whether the JM GC status? One possible cause is that the large number of file metas inHadoopInputFormat is burdening the JM memory. `akka.ask.timeout` is the default RPC timeout, while some RPCs may override this timeout for their own purpose. e.g. the RPCs from web

Flink Read thousands of files with batch

2019-11-10 Thread Dominik Wosiński
Hey, I have a very specific use case. I have a history of records stored as Parquet in S3. I would like to read and process them with Flink. The issue is that the number of files is quite large ( >100k). If I provide the full list of files to HadoopInputFormat that I am using it will fail with