Qi, Thanks for references! How do enable concurrent s3 file listing? Here is the code.
// Consume the JSON files Configuration configuration = new Configuration(GlobalConfiguration.loadConfiguration()); configuration.setBoolean(JsonLinesInputFormat.ENUMERATE_NESTED_FILES_FLAG, true); JsonLinesInputFormat jsonInputFormat = new JsonLinesInputFormat(new Path(inputPath), configuration); jsonInputFormat.setFilesFilter(new BucketingSinkFilter()); DataSet<ObjectNode> input = env.readFile(jsonInputFormat, inputPath).withParameters(configuration); On Wed, Dec 12, 2018 at 8:53 PM qi luo <luoqi...@gmail.com> wrote: > Hi Alex, > > The hard code I’ve found is [1] and [2]. > > We encountered a similar issue like yours (listing a lot of HDFS files). > We end up with a newer version of HDFSFileInput which lists files > concurrently. Another hack we did is to list the files in client side and > pass them to JobManager via serialization (not recommended though as it > doesn’t follow Flink framework mechanism). > > You can also try listing S3 files concurrently, or paste your sample code > here. > > [1] > https://github.com/apache/flink/blob/967b31b333e6f4b014ea3041f420bfaff2484618/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L187 > [2] > https://github.com/apache/flink/blob/b0496f21d70cc1af15569f3632d7a58fd53b8f95/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java#L117 > > On Dec 13, 2018, at 1:09 AM, Alex Vinnik <alvinni...@gmail.com> wrote: > > Qi, > > Job submission timeout is caused by listing too many files in S3 > during env.readFile call to create input DataSet. Is there a way NOT to > list S3 files during a job submission? It seems like it should help to > mitigate that timeout problem. > > What hardcoded value you were referring to? > > Best, > -Alex > > On Wed, Dec 12, 2018 at 7:47 AM Alex Vinnik <alvinni...@gmail.com> wrote: > >> Hi Qi, >> >> Thanks for looking into this. Here is ticket >> https://issues.apache.org/jira/browse/FLINK-11143 >> >> Best, >> -Alex >> >> On Tue, Dec 11, 2018 at 8:47 PM qi luo <luoqi...@gmail.com> wrote: >> >>> Hi Alex and Lukas, >>> >>> This error is controlled by another RPC timeout (which is hard coded and >>> not affected by “akka.ask.timeout”). Could you open an JIRA issue so I can >>> propose a fix on that? >>> >>> Cheers, >>> Qi >>> >>> On Dec 12, 2018, at 7:07 AM, Alex Vinnik <alvinni...@gmail.com> wrote: >>> >>> Hi there, >>> >>> Run into the same problem running a batch job with Flink 1.6.1/1.6.2 . >>> >>> akka.pattern.AskTimeoutException: Ask timed out on [Actor[ >>> akka://flink/user/dispatcher#202546747]] after [10000 ms]. Sender[null] >>> sent message of type >>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". >>> >>> akka.ask.timeout: 600s >>> >>> But looks like it is not honored. Any suggestions what can be done. >>> >>> Thanks >>> >>> On 2018/07/13 10:24:16, Lukas Kircher <l...@gmail.com> wrote: >>> > Hello,> >>> > >>> > I have problems setting configuration parameters for Akka in Flink >>> 1.5.0. When I run a job I get the exception listed below which states that >>> Akka timed out after 10000ms. I tried to increase the timeout by following >>> the Flink configuration documentation. Specifically I did the following:> >>> > >>> > 1) Passed a configuration to the Flink execution environment with >>> `akka.ask.timeout` set to a higher value. I started this in Intellij.> >>> > 2) Passed program arguments via the run configuration in Intellij, >>> e.g. `-Dakka.ask.timeout:100s`> >>> > 3) Added `akka.ask.timeout: 100 s` to flink-conf.yaml and started a >>> local standalone cluster via start-cluster.sh. The setting is reflected in >>> Flink's web interface.> >>> > >>> > However - despite explicit configuration the default setting seems to >>> be used. The exception below states in each case that akka ask timed out >>> after 10000ms.> >>> > >>> > As my problem seems very basic I do not include an SSCCE for now but I >>> can try to build one if this helps figuring out the issue.> >>> > >>> > ------> >>> > [...]> >>> > Exception in thread "main" >>> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve >>> JobResult.> >>> > [...]> >>> > at >>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:619)> >>> >>> > at >>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:234)> >>> > at >>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)> >>> >>> > at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)> >>> >>> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)> >>> > [...]> >>> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[ >>> akka://flink/user/dispatcher8df05371-effc-468b-8a22-e2f364f65d6a#582308583]] >>> after [10000 ms]. Sender[null] sent message of type >>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".> >>> > at >>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)> >>> > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)> >>> > at >>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)> >>> >>> > at >>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)> >>> >>> > at >>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)> >>> >>> > at >>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)> >>> >>> > at >>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)> >>> >>> > at >>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)> >>> >>> > at >>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)> >>> >>> > at java.lang.Thread.run(Thread.java:745)> >>> > [...]> >>> > ------> >>> > >>> > >>> > Best regards and thanks for your help,> >>> > Lukas> >>> > >>> > >>> > >>> > >>> >>> >>> >