Hi,

Piotr is correct. The cause of this issue is likely because the
instantiation of the SplitEnumerator is done in the JM main thread.
FLINK-22282 has been created to address this issue.

Thanks,

Jiangjie (Becket) Qin

On Wed, Apr 14, 2021 at 10:32 PM Piotr Nowojski <pnowoj...@apache.org>
wrote:

> Hi,
>
> I haven't found anything strange in the logs (I've received logs in a
> separate message). It looks like the problem is that split enumeration is
> taking a long time, and currently this is being done in the Job Manager's
> main thread, blocking other things from executing. For the time being I
> think the only thing you can do is to either speed up the split enumeration
> (probably difficult) or increase the timeouts that are failing. I don't
> know if there is some other workaround at the moment (Becket?).
>
> Piotrek
>
> śr., 14 kwi 2021 o 15:57 Piotr Nowojski <pnowoj...@apache.org> napisał(a):
>
>> Hey,
>>
>> could you provide full logs from both task managers and job managers?
>>
>> Piotrek
>>
>> śr., 14 kwi 2021 o 15:43 太平洋 <495635...@qq.com> napisał(a):
>>
>>> After submit job, I received 'Failed to execute job' error. And the
>>> time between initialization and scheduling last 214s. What has happened
>>> during this period?
>>>
>>> version: flink: 1.12.2
>>> deployment: k8s standalone
>>> logs:
>>>
>>> 2021-04-14 12:47:58,547 WARN
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] -
>>> Property [transaction.timeout.ms] not specified. Setting it to 3600000
>>> ms
>>> 2021-04-14 12:48:04,175 INFO
>>> org.apache.flink.client.deployment.application.executors.
>>> EmbeddedExecutor [] - Job 1276000e99efdb77bdae0df88ab91da3 is submitted.
>>> 2021-04-14 12:48:04,175 INFO
>>> org.apache.flink.client.deployment.application.executors.
>>> EmbeddedExecutor [] - Submitting Job with JobId=1276000e99
>>> efdb77bdae0df88ab91da3.
>>> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher.
>>> StandaloneDispatcher [] - Received JobGraph submission 
>>> 1276000e99efdb77bdae0df88ab91da3
>>> (Prediction Program).
>>> 2021-04-14 12:48:04,249 INFO org.apache.flink.runtime.dispatcher.
>>> StandaloneDispatcher [] - Submitting job 1276000e99efdb77bdae0df88ab91da3
>>> (Prediction Program).
>>> 2021-04-14 12:48:04,250 INFO org.apache.flink.runtime.rpc.akka.
>>> AkkaRpcService [] - Starting RPC endpoint for
>>> org.apache.flink.runtime.jobmaster.JobMaster at
>>> akka://flink/user/rpc/jobmanager_8 .
>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Initializing job Prediction Program (1276000e99
>>> efdb77bdae0df88ab91da3).
>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Using restart back off time strategy
>>> NoRestartBackoffTimeStrategy for Prediction Program (1276000e99
>>> efdb77bdae0df88ab91da3).
>>> 2021-04-14 12:48:04,251 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Running initialization on master for job Prediction
>>> Program (1276000e99efdb77bdae0df88ab91da3).
>>> 2021-04-14 12:48:04,252 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Successfully ran initialization on master in 0 ms.
>>> 2021-04-14 12:48:04,254 INFO org.apache.flink.runtime.scheduler.adapter.
>>> DefaultExecutionTopology [] - Built 10 pipelined regions in 0 ms
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Using application-defined state backend:
>>> org.apache.flink.streaming.api.operators.sorted.state.
>>> BatchExecutionStateBackend@3ea8cd5a
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.checkpoint.
>>> CheckpointCoordinator [] - No checkpoint found during restore.
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Using failover strategy
>>> org.apache.flink.runtime.executiongraph.failover.flip1.
>>> RestartPipelinedRegionFailoverStrategy@26845997 for Prediction Program (
>>> 1276000e99efdb77bdae0df88ab91da3).
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>>> JobManagerRunnerImpl [] - JobManager runner for job Prediction Program (
>>> 1276000e99efdb77bdae0df88ab91da3) was granted leadership with session
>>> id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@flink
>>> -jobmanager:6123/user/rpc/jobmanager_8.
>>> 2021-04-14 12:48:04,255 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Starting execution of job Prediction Program 
>>> (1276000e99efdb77bdae0df88ab91da3)
>>> under job master id 00000000000000000000000000000000.
>>> 2021-04-14 12:48:04,255 INFO
>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] -
>>> Starting split enumerator for source Source: 
>>> TableSourceScan(table=[[default_catalog,
>>> default_database, cpu_util, filter=[], project=[instance_id, value,
>>> timestamp]]], fields=[instance_id, value, timestamp]) -> 
>>> Calc(select=[instance_id,
>>> value, timestamp], where=[(timestamp > 1618145278)]) ->
>>> SinkConversionToDataPoint -> Map.
>>> org.apache.flink.util.FlinkException: Failed to execute job 'Prediction
>>> Program'. at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
>>> at
>>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
>>> at
>>> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
>>> at com.jd.app.StreamingJob.main(StreamingJob.java:265) at
>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498) at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
>>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>>> at
>>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
>>> at
>>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748) Caused by:
>>> java.lang.RuntimeException: Error while waiting for job to be initialized
>>> at
>>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
>>> at
>>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitAndGetJobClientFuture$2(EmbeddedExecutor.java:140)
>>> at
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
>>> at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>>> at
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>> ... 1 more Caused by: java.util.concurrent.ExecutionException:
>>> java.util.concurrent.TimeoutException: Invocation of public default
>>> java.util.concurrent.CompletableFuture
>>> org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time)
>>> timed out. at
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>> at
>>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$null$0(EmbeddedExecutor.java:145)
>>> at
>>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
>>> ... 6 more Caused by: java.util.concurrent.TimeoutException: Invocation of
>>> public default java.util.concurrent.CompletableFuture
>>> org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time)
>>> timed out. at com.sun.proxy.$Proxy26.requestJobStatus(Unknown Source) at
>>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$null$0(EmbeddedExecutor.java:143)
>>> ... 7 more Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/rpc/dispatcher_1#1243668943]] after [60000 ms].
>>> Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage].
>>> A typical reason for `AskTimeoutException` is that the recipient actor
>>> didn't send a reply. at
>>> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at
>>> akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at
>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648)
>>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>> at
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>> at
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
>>> at
>>> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
>>> ... 1 more
>>> 2021-04-14 12:49:04,321 ERROR com.jd.app.StreamingJob [] - xxxx exec
>>> error org.apache.flink.util.FlinkException: Failed to execute job
>>> 'xxxxxx'.
>>> 2021-04-14 12:51:38,327 INFO org.apache.flink.runtime.jobmaster.
>>> JobMaster [] - Starting scheduling with scheduling strategy
>>> [org.apache.flink.runtime.scheduler.strategy.
>>> PipelinedRegionSchedulingStrategy]
>>> 2021-04-14 12:51:38,328 INFO org.apache.flink.runtime.executiongraph.
>>> ExecutionGraph [] - Job Prediction Program 
>>> (1276000e99efdb77bdae0df88ab91da3)
>>> switched from state CREATED to RUNNING.
>>> 2021-04-14 12:51:38,328 INFO org.apache.flink.runtime.executiongraph.
>>> ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog,
>>> default_database, cpu_util, filter=[], project=[instance_id, value,
>>> timestamp]]], fields=[instance_id, value, timestamp]) -> 
>>> Calc(select=[instance_id,
>>> value, timestamp], where=[(timestamp > 1618145278)]) ->
>>> SinkConversionToDataPoint -> Map (1/5) (52ad5c769b4836498fadf954d5921401)
>>> switched from CREATED to SCHEDULED.
>>> 2021-04-14 12:51:38,328 INFO
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Cannot
>>> serve slot request, no ResourceManager connected. Adding as pending
>>> request [SlotRequestId{90a7db543b771ed399f0b2b0414ef288}]
>>> 2021-04-14 12:51:38,328 INFO org.apache.flink.runtime.executiongraph.
>>> ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog,
>>> default_database, cpu_util, filter=[], project=[instance_id, value,
>>> timestamp]]], fields=[instance_id, value, timestamp]) -> 
>>> Calc(select=[instance_id,
>>> value, timestamp], where=[(timestamp > 1618145278)]) ->
>>> SinkConversionToDataPoint -> Map (2/5) (1f877463154f27d6f0aa7a9af9c2f64b)
>>> switched from CREATED to SCHEDULED.
>>>
>>

Reply via email to