Re: Flink job failure during yarn node termination

2021-08-04 Thread Rainie Li
Hi Nicolaus,

I double checked again our hdfs config, it is setting 1 instead of 2.
I will try the solution you provided.

Thanks again.
Best regards
Rainie

On Wed, Aug 4, 2021 at 10:40 AM Rainie Li  wrote:

> Thanks for the context Nicolaus.
> We are using S3 instead of HDFS.
>
> Best regards
> Rainie
>
> On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner <
> nicolaus.weid...@ververica.com> wrote:
>
>> Hi Rainie,
>>
>> I found a similar issue on stackoverflow, though quite different
>> stacktrace:
>> https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination
>> Do you replicate data on multiple hdfs nodes like suggested in the answer
>> there?
>>
>> Best,
>> Nico
>>
>> On Wed, Aug 4, 2021 at 9:24 AM Rainie Li  wrote:
>>
>>> Thanks Till.
>>> We terminated one of the worker nodes.
>>> We enabled HA by using Zookeeper.
>>> Sure, we will try upgrade job to newer version.
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Rainie,
>>>>
>>>> It looks to me as if Yarn is causing this problem. Which Yarn node are
>>>> you terminating? Have you configured your Yarn cluster to be highly
>>>> available in case you are terminating the ResourceManager?
>>>>
>>>> Flink should retry the operation of starting a new container in case it
>>>> fails. If this is not the case, then please upgrade to one of the actively
>>>> maintained Flink versions (1.12 or 1.13) and try whether it works with this
>>>> version.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li 
>>>> wrote:
>>>>
>>>>> Hi Flink Community,
>>>>>
>>>>> My flink application is running version 1.9 and it failed to recover
>>>>> (application was running but checkpoint failed and job stopped to process
>>>>> data) during hadoop yarn node termination.
>>>>>
>>>>> *Here is job manager log error:*
>>>>> *2021-07-26 18:02:58,605 INFO
>>>>>  org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
>>>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB 
>>>>> over
>>>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>>>>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>>>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>> Operation category READ is not supported in state standby*
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>> at
>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>>>>
>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>&g

Re: Flink job failure during yarn node termination

2021-08-04 Thread Rainie Li
Thanks for the context Nicolaus.
We are using S3 instead of HDFS.

Best regards
Rainie

On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner <
nicolaus.weid...@ververica.com> wrote:

> Hi Rainie,
>
> I found a similar issue on stackoverflow, though quite different
> stacktrace:
> https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination
> Do you replicate data on multiple hdfs nodes like suggested in the answer
> there?
>
> Best,
> Nico
>
> On Wed, Aug 4, 2021 at 9:24 AM Rainie Li  wrote:
>
>> Thanks Till.
>> We terminated one of the worker nodes.
>> We enabled HA by using Zookeeper.
>> Sure, we will try upgrade job to newer version.
>>
>> Best regards
>> Rainie
>>
>> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Rainie,
>>>
>>> It looks to me as if Yarn is causing this problem. Which Yarn node are
>>> you terminating? Have you configured your Yarn cluster to be highly
>>> available in case you are terminating the ResourceManager?
>>>
>>> Flink should retry the operation of starting a new container in case it
>>> fails. If this is not the case, then please upgrade to one of the actively
>>> maintained Flink versions (1.12 or 1.13) and try whether it works with this
>>> version.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li  wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> My flink application is running version 1.9 and it failed to recover
>>>> (application was running but checkpoint failed and job stopped to process
>>>> data) during hadoop yarn node termination.
>>>>
>>>> *Here is job manager log error:*
>>>> *2021-07-26 18:02:58,605 INFO
>>>>  org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
>>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
>>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>>>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>> Operation category READ is not supported in state standby*
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>>>> at
>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>>>> at
>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>>>> at
>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>> at
>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>>>
>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>>>> at
>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>>>> at
>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>>>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>&g

Re: Flink job failure during yarn node termination

2021-08-04 Thread Rainie Li
Thanks Till.
We terminated one of the worker nodes.
We enabled HA by using Zookeeper.
Sure, we will try upgrade job to newer version.

Best regards
Rainie

On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann  wrote:

> Hi Rainie,
>
> It looks to me as if Yarn is causing this problem. Which Yarn node are you
> terminating? Have you configured your Yarn cluster to be highly available
> in case you are terminating the ResourceManager?
>
> Flink should retry the operation of starting a new container in case it
> fails. If this is not the case, then please upgrade to one of the actively
> maintained Flink versions (1.12 or 1.13) and try whether it works with this
> version.
>
> Cheers,
> Till
>
> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li  wrote:
>
>> Hi Flink Community,
>>
>> My flink application is running version 1.9 and it failed to recover
>> (application was running but checkpoint failed and job stopped to process
>> data) during hadoop yarn node termination.
>>
>> *Here is job manager log error:*
>> *2021-07-26 18:02:58,605 INFO
>>  org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category READ is not supported in state standby*
>> at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>> at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>> at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>> at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>
>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>> at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>> at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>> at
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
>> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
>> at
>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)

Flink job failure during yarn node termination

2021-08-03 Thread Rainie Li
Hi Flink Community,

My flink application is running version 1.9 and it failed to recover
(application was running but checkpoint failed and job stopped to process
data) during hadoop yarn node termination.

*Here is job manager log error:*
*2021-07-26 18:02:58,605 INFO
 org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
. Trying to fail over immediately.*
*org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
Operation category READ is not supported in state standby*
at
org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
at
org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)

at org.apache.hadoop.ipc.Client.call(Client.java:1476)
at org.apache.hadoop.ipc.Client.call(Client.java:1407)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
at
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
at
org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
at
org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
at java.lang.Iterable.forEach(Iterable.java:75)
at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at 

Re: Savepoint failure with operation not found under key

2021-06-29 Thread Rainie Li
I see, then it passed longer than 5 mins.
Thanks for the help.

Best regards
Rainie

On Tue, Jun 29, 2021 at 12:29 AM Chesnay Schepler 
wrote:

> How much time has passed between the requests? (You can only query the
> status for about 5 minutes)
>
> On 6/29/2021 6:37 AM, Rainie Li wrote:
>
> Thanks for the context Chesnay.
> Yes, I sent both requests to the same JM.
>
> Best regards
> Rainie
>
> On Mon, Jun 28, 2021 at 8:33 AM Chesnay Schepler 
> wrote:
>
>> Ordinarily this happens because the status request is sent to a different
>> JM than the one who received the request for creating a savepoint.
>> The meta information for such requests is only stored locally on each JM
>> and neither distributed to all JMs nor persisted anywhere.
>>
>> Did you send both requests ( the ones for creating a savepoint and one
>> for querying the status) to the same JM?
>>
>> On 6/26/2021 11:18 PM, Rainie Li wrote:
>>
>> Hi Flink Community,
>>
>> I found this error when I tried to create a savepoint for my flink job.
>> It's in version 1.9.
>> {
>>
>> "errors": [
>> "Operation not found under key:
>> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@57b9711e"
>> ]
>> }
>>
>> Here is error from JM log:
>>
>> 2021-06-21 06:49:50,195 ERROR 
>> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler
>>   - Exception occurred in REST handler: Operation not found under key: 
>> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@56675052
>> 2021-06-21 06:50:50,023 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
>> 47281 of job 8de31918b2f2d983eee0d7f988a8f95a expired before completing.
>>
>> Any idea what could cause it and how to find more debugging info?
>>
>> Appreciated for any suggestions.
>>
>> Thanks
>>
>> Best regard
>>
>> Rainie
>>
>>
>>
>


Re: Savepoint failure with operation not found under key

2021-06-28 Thread Rainie Li
Thanks for the context Chesnay.
Yes, I sent both requests to the same JM.

Best regards
Rainie

On Mon, Jun 28, 2021 at 8:33 AM Chesnay Schepler  wrote:

> Ordinarily this happens because the status request is sent to a different
> JM than the one who received the request for creating a savepoint.
> The meta information for such requests is only stored locally on each JM
> and neither distributed to all JMs nor persisted anywhere.
>
> Did you send both requests ( the ones for creating a savepoint and one for
> querying the status) to the same JM?
>
> On 6/26/2021 11:18 PM, Rainie Li wrote:
>
> Hi Flink Community,
>
> I found this error when I tried to create a savepoint for my flink job.
> It's in version 1.9.
> {
>
> "errors": [
> "Operation not found under key:
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@57b9711e"
> ]
> }
>
> Here is error from JM log:
>
> 2021-06-21 06:49:50,195 ERROR 
> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler
>   - Exception occurred in REST handler: Operation not found under key: 
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@56675052
> 2021-06-21 06:50:50,023 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> 47281 of job 8de31918b2f2d983eee0d7f988a8f95a expired before completing.
>
> Any idea what could cause it and how to find more debugging info?
>
> Appreciated for any suggestions.
>
> Thanks
>
> Best regard
>
> Rainie
>
>
>


Savepoint failure with operation not found under key

2021-06-26 Thread Rainie Li
Hi Flink Community,

I found this error when I tried to create a savepoint for my flink job.
It's in version 1.9.
{

"errors": [
"Operation not found under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@57b9711e"
]
}

Here is error from JM log:

2021-06-21 06:49:50,195 ERROR
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler
 - Exception occurred in REST handler: Operation not found under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@56675052
2021-06-21 06:50:50,023 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Checkpoint 47281 of job 8de31918b2f2d983eee0d7f988a8f95a expired
before completing.


Any idea what could cause it and how to find more debugging info?

Appreciated for any suggestions.


Thanks

Best regard

Rainie


Re: Flink Version 1.11 job savepoint failures

2021-05-03 Thread Rainie Li
It helps.
Thanks Matthias.

Best regards
Rainie

On Mon, May 3, 2021 at 4:25 AM Matthias Pohl  wrote:

> Hi Rainie,
> the savepoint creation failed due to some tasks already being finished. It
> looks like you ran into an issue that was (partially as FLINK-21066 [1] is
> only a subtask of a bigger issue?) addressed in Flink 1.13 (see
> FLINK-21066). I'm pulling Yun Gao into this thread. Let's see whether Yun
> can confirm that finding.
>
> I hope that helps.
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-21066
>
> On Mon, May 3, 2021 at 9:07 AM Rainie Li  wrote:
>
>> Hi Flink Community,
>>
>> Our flink jobs are in version 1.11 and we use this to trigger savepoint.
>> $ bin/flink savepoint :jobId [:targetDirectory]
>> We can get trigger Id with savepoint path successfully.
>>
>> But we saw these errors by querying savepoint endpoint:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-savepoints-triggerid
>> e.g. application_id/jobs/job_id/savepoints/trigger_id
>>
>> {
>> *  "*errors*": *[
>> "org.apache.flink.runtime.rest.NotFoundException: Operation not
>> found under key:
>> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@8893e196\n\tat
>> org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest(AbstractAsynchronousOperationHandlers.java:167)\n\tat
>> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest(SavepointHandlers.java:193)\n\tat
>> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:73)\n\tat
>> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:178)\n\tat
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:81)\n\tat
>> java.util.Optional.ifPresent(Optional.java:159)\n\tat
>> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:46)\n\tat
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:78)\n\tat
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)\n\tat
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)\n\tat
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)\n\tat
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChanne

Flink Version 1.11 job savepoint failures

2021-05-03 Thread Rainie Li
Hi Flink Community,

Our flink jobs are in version 1.11 and we use this to trigger savepoint.
$ bin/flink savepoint :jobId [:targetDirectory]
We can get trigger Id with savepoint path successfully.

But we saw these errors by querying savepoint endpoint:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-savepoints-triggerid
e.g. application_id/jobs/job_id/savepoints/trigger_id

{
*  "*errors*": *[
"org.apache.flink.runtime.rest.NotFoundException: Operation not found
under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@8893e196\n\tat
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest(AbstractAsynchronousOperationHandlers.java:167)\n\tat
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest(SavepointHandlers.java:193)\n\tat
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:73)\n\tat
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:178)\n\tat
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:81)\n\tat
java.util.Optional.ifPresent(Optional.java:159)\n\tat
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:46)\n\tat
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:78)\n\tat
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)\n\tat
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)\n\tat
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)\n\tat
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)\n\tat
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)\n\tat
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)\n\tat

Re: Flink application has slightly data loss using Processing Time

2021-03-22 Thread Rainie Li
I will try that.
Thanks for your help, David.

Best regards
Rainie

On Sat, Mar 20, 2021 at 9:46 AM David Anderson  wrote:

> You should increase the kafka transaction timeout --
> transaction.max.timeout.ms -- to something much larger than the default,
> which I believe is 15 minutes. Suitable values are more on the order of a
> few hours to a few days -- long enough to allow for any conceivable outage.
> This way, if a request does timeout and causes the Flink job to fail, so
> long as Kafka and Flink recover within the transaction timeout you won't
> lose any data.
>
> Regards,
> David
>
>
>
> On Sat, Mar 20, 2021 at 12:02 AM Rainie Li  wrote:
>
>> Hi Arvid,
>>
>> After increasing  producer.kafka.request.timeout.ms from 9 to 12.
>> The job has been running fine for almost 5 days, but one of the tasks
>> failed again recently for the same timeout error. (attached stack trace
>> below)
>> Should I keep increasing producer.kafka.request.timeout.ms value?
>>
>> Thanks again for the help.
>> Best regards
>> Rainie
>>
>> *Stacktrace:*
>> {job_name}/{job_id}/chk-43556/_metadata, reference=(default),
>> fileStateSizeThreshold=2048, writeBufferSize=4096}, synchronous part) in
>> thread Thread[Process-Event - Filter-data08 (237/240),5,Flink Task
>> Threads] took 0 ms.
>> Canceler.run(Task.java:1434)
>> ... 1 more2021-03-17 17:46:42,284 INFO
>>  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Could
>> not complete snapshot 43556 for operator Sink: Sink-data08 (237/240).
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
>> to send data to Kafka: Expiring 53 record(s) for frontend_event_core-46:
>> 122269 ms has passed since last append
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
>>   at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
>>   at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
>>   at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>>   at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>>   at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>   at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
>>   at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>>   at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>>   at
>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamT

Re: Flink application has slightly data loss using Processing Time

2021-03-19 Thread Rainie Li
(SourceStreamTask.java:232)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
  at
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
  at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
  at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
  at
com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:436)
  at
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
  at org.apache.flink.runtime.taskmanager.Task$Task
2021-03-17 17:46:42,391 INFO  org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Process-Event -
Filter-data08 (237/240) (34cf65b29f7153d6f0a82819eeaf218d).


On Thu, Mar 11, 2021 at 7:12 AM Rainie Li  wrote:

> Thanks for the suggestion, Arvid.
> Currently my job is using producer.kafka.request.timeout.ms=9
> I will try to increase to 12.
>
> Best regards
> Rainie
>
> On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise  wrote:
>
>> Hi Rainie,
>>
>> This looks like the record batching in Kafka producer timed out. At this
>> point, the respective records are lost forever. You probably want to tweak
>> your Kafka settings [1].
>>
>> Usually, Flink should fail and restart at this point and recover without
>> data loss. However, if the transactions are also timing out, that may
>> explain the data loss. So you probably also want to increase the
>> transaction timeout.
>>
>> [1]
>> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception
>>
>> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li  wrote:
>>
>>> Thanks for the info, David.
>>> The job has checkpointing.
>>> I saw some tasks failed due to
>>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>>> send data to Kafka"
>>> Here is stacktrack from JM log:
>>>
>>> container_e17_1611597945897_8007_01_000240 @ worker-node-host
>>> (dataPort=42321).
>>> 2021-02-10 01:19:27,206 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
>>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
>>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure
>>> reason: Checkpoint was declined.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>>> at
>>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInp

Re: Flink application has slightly data loss using Processing Time

2021-03-11 Thread Rainie Li
Thanks for the suggestion, Arvid.
Currently my job is using producer.kafka.request.timeout.ms=9
I will try to increase to 12.

Best regards
Rainie

On Thu, Mar 11, 2021 at 3:58 AM Arvid Heise  wrote:

> Hi Rainie,
>
> This looks like the record batching in Kafka producer timed out. At this
> point, the respective records are lost forever. You probably want to tweak
> your Kafka settings [1].
>
> Usually, Flink should fail and restart at this point and recover without
> data loss. However, if the transactions are also timing out, that may
> explain the data loss. So you probably also want to increase the
> transaction timeout.
>
> [1]
> https://stackoverflow.com/questions/53223129/kafka-producer-timeoutexception
>
> On Mon, Mar 8, 2021 at 8:34 PM Rainie Li  wrote:
>
>> Thanks for the info, David.
>> The job has checkpointing.
>> I saw some tasks failed due to
>> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>> send data to Kafka"
>> Here is stacktrack from JM log:
>>
>> container_e17_1611597945897_8007_01_000240 @ worker-node-host
>> (dataPort=42321).
>> 2021-02-10 01:19:27,206 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
>> checkpoint 21355 of job 7dab4c1a1c6984e70732b8e3f218020f.
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 21355 for operator Sink: Sink-data08 (208/240). Failure
>> reason: Checkpoint was declined.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1302)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1236)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:892)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:797)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:728)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:88)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
>> at
>> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
>> send data to Kafka: Expiring 42 record(s) for topic-name-38: 116447 ms has
>> passed since last append
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1196)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:968)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:310)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:973)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFu

Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Rainie Li
k.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Best regards
Rainie

On Mon, Mar 8, 2021 at 11:09 AM David Anderson  wrote:

> Rainie,
>
> A restart after a failure can cause data loss if you aren't using
> checkpointing, or if you experience a transaction timeout.
>
> A manual restart can also lead to data loss, depending on how you manage
> the offsets, transactions, and other state during the restart. What
> happened in this case?
>
> David
>
> On Mon, Mar 8, 2021 at 7:53 PM Rainie Li  wrote:
>
>> Thanks Yun and David.
>> There were some tasks that got restarted. We configured the restart
>> policy and the job didn't fail.
>> Will task restart cause data loss?
>>
>> Thanks
>> Rainie
>>
>>
>> On Mon, Mar 8, 2021 at 10:42 AM David Anderson 
>> wrote:
>>
>>> Rainie,
>>>
>>> Were there any failures/restarts, or is this discrepancy observed
>>> without any disruption to the processing?
>>>
>>> Regards,
>>> David
>>>
>>> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li 
>>> wrote:
>>>
>>>> Thanks for the quick response, Smile.
>>>> I don't use window operators or flatmap.
>>>> Here is the core logic of my filter, it only iterates on filters list.
>>>> Will *rebalance() *cause it?
>>>>
>>>> Thanks again.
>>>> Best regards
>>>> Rainie
>>>>
>>>> SingleOutputStreamOperator> 
>>>> matchedRecordsStream =
>>>> eventStream
>>>> .rebalance()
>>>> .process(new ProcessFunction>() {
>>>>   public void processElement(
>>>>   T element,
>>>>   ProcessFunction>.Context 
>>>> context,
>>>>   Collector> collector) {
>>>> for (StreamFilter filter : filters) {
>>>>   if (filter.match(element)) {
>>>> SubstreamConfig substreamConfig = 
>>>> filter.getSubstreamConfig();
>>>> SplitterIntermediateRecord result = new 
>>>> SplitterIntermediateRecord<>(
>>>> substreamConfig.getKafkaCluster(),
>>>> substreamConfig.getKafkaTopic(),
>>>> substreamConfig.getCutoverKafkaTopic(),
>>>> substreamConfig.getCutoverTimestampInMs(),
>>>> element);
>>>> collector.collect(result);
>>>>   }
>>>> }
>>>>   }
>>>> })
>>>> .name("Process-" + eventClass.getSimpleName());
>>>>
>>>>
>>>> On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:
>>>>
>>>>> Hi Rainie,
>>>>>
>>>>> Could you please provide more information about your processing logic?
>>>>> Do you use window operators?
>>>>> If there's no time-based operator in your logic, late arrival data
>>>>> won't be
>>>>> dropped by default and there might be something wrong with your flat
>>>>> map or
>>>>> filter operator. Otherwise, you can use sideOutputLateData() to get
>>>>> the late
>>>>> data of the window and have a look at them. See [1] for more
>>>>> information
>>>>> about sideOutputLateData().
>>>>>
>>>>> [1].
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>
>>>>> Regards,
>>>>> Smile
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>


Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Rainie Li
Thanks Yun and David.
There were some tasks that got restarted. We configured the restart policy
and the job didn't fail.
Will task restart cause data loss?

Thanks
Rainie


On Mon, Mar 8, 2021 at 10:42 AM David Anderson  wrote:

> Rainie,
>
> Were there any failures/restarts, or is this discrepancy observed without
> any disruption to the processing?
>
> Regards,
> David
>
> On Mon, Mar 8, 2021 at 10:14 AM Rainie Li  wrote:
>
>> Thanks for the quick response, Smile.
>> I don't use window operators or flatmap.
>> Here is the core logic of my filter, it only iterates on filters list.
>> Will *rebalance() *cause it?
>>
>> Thanks again.
>> Best regards
>> Rainie
>>
>> SingleOutputStreamOperator> 
>> matchedRecordsStream =
>> eventStream
>> .rebalance()
>> .process(new ProcessFunction>() {
>>   public void processElement(
>>   T element,
>>   ProcessFunction>.Context 
>> context,
>>   Collector> collector) {
>> for (StreamFilter filter : filters) {
>>   if (filter.match(element)) {
>> SubstreamConfig substreamConfig = 
>> filter.getSubstreamConfig();
>> SplitterIntermediateRecord result = new 
>> SplitterIntermediateRecord<>(
>> substreamConfig.getKafkaCluster(),
>> substreamConfig.getKafkaTopic(),
>> substreamConfig.getCutoverKafkaTopic(),
>> substreamConfig.getCutoverTimestampInMs(),
>> element);
>> collector.collect(result);
>>   }
>> }
>>   }
>> })
>> .name("Process-" + eventClass.getSimpleName());
>>
>>
>> On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:
>>
>>> Hi Rainie,
>>>
>>> Could you please provide more information about your processing logic?
>>> Do you use window operators?
>>> If there's no time-based operator in your logic, late arrival data won't
>>> be
>>> dropped by default and there might be something wrong with your flat map
>>> or
>>> filter operator. Otherwise, you can use sideOutputLateData() to get the
>>> late
>>> data of the window and have a look at them. See [1] for more information
>>> about sideOutputLateData().
>>>
>>> [1].
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> Regards,
>>> Smile
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Rainie Li
Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list.
Will *rebalance()
*cause it?

Thanks again.
Best regards
Rainie

SingleOutputStreamOperator> matchedRecordsStream =
eventStream
.rebalance()
.process(new ProcessFunction>() {
  public void processElement(
  T element,
  ProcessFunction>.Context context,
  Collector> collector) {
for (StreamFilter filter : filters) {
  if (filter.match(element)) {
SubstreamConfig substreamConfig = filter.getSubstreamConfig();
SplitterIntermediateRecord result = new
SplitterIntermediateRecord<>(
substreamConfig.getKafkaCluster(),
substreamConfig.getKafkaTopic(),
substreamConfig.getCutoverKafkaTopic(),
substreamConfig.getCutoverTimestampInMs(),
element);
collector.collect(result);
  }
}
  }
})
.name("Process-" + eventClass.getSimpleName());


On Mon, Mar 8, 2021 at 1:03 AM Smile  wrote:

> Hi Rainie,
>
> Could you please provide more information about your processing logic?
> Do you use window operators?
> If there's no time-based operator in your logic, late arrival data won't be
> dropped by default and there might be something wrong with your flat map or
> filter operator. Otherwise, you can use sideOutputLateData() to get the
> late
> data of the window and have a look at them. See [1] for more information
> about sideOutputLateData().
>
> [1].
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> Regards,
> Smile
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Flink application has slightly data loss using Processing Time

2021-03-08 Thread Rainie Li
Hello Flink Community,

Our flink application in v1.9, the basic logic of this application is
consuming one large kafka topic and filter some fields, then produce data
to a new kafka topic.
After comparing the original kafka topic count with the generated kafka
topic based on the same field by using presto query, it had slightly data
loss (around 1.37220156e-7 per hour).
The Original kafka topic is collecting data from mobile devices, it could
have late arrival events. That's why we use processing time since order
does not matter.

This job is using Processing time, any idea what could potentially cause
this data loss?
Also if flink is using processing time, what is the default time window?
Will the default time window cause it?

Appreciated for any suggestions.
Thanks
Best regards
Rainie


Re: Flink application kept restarting

2021-03-04 Thread Rainie Li
Thanks Julian. Appreciated your help.
I will check task manager logs and decide pick which approah.

Best regards
Rainie


On Thu, Mar 4, 2021 at 3:04 PM Jaffe, Julian 
wrote:

> Hey Rainie,
>
>
>
> Kafka internally attempts to retry topic metadata fetches if possible. If
> you think the root cause was just due to network congestion or the like,
> you might want to look into increasing `request.timeout.ms`. Because of
> the internal retry attempts, however, this exception usually means that the
> client was unable to connect to the brokers in a more permanent way. The
> usual culprits are SSL topics with plaintext consumers or vice versa, SSL
> topics where the client was unable to fetch the necessary credentials,
> incorrect bootstrap servers configs, and cluster maintenance. Since your
> job was running successfully for a long time before crashing, I’d check the
> task manager logs for the period just before the error for any warnings or
> errors from Kafka to see if they shed light. If you’re dynamically loading
> SSL certs or using cnames to provide stable bootstrap server identifiers,
> it can be non-obvious if your config changed mid-operation, but often times
> some part of the stack will log a warning message or swallowed exception.
> If you have verbose logging, you might also be logging the Kafka connection
> properties each time they’re regenerated.
>
>
>
> TL;DR:
>
>
>
> If you think the issue is due to high latency in communicating with your
> Kafka cluster, increase your configured timeouts.
>
>
>
> If you think the issue is due to (possibly temporary) dynamic config
> changes, check your error handling. Retrying the fetch from the Flink side
> will only help if the cause of the time is very transient.
>
>
>
> Julian
>
>
>
> *From: *Rainie Li 
> *Date: *Thursday, March 4, 2021 at 1:49 PM
> *To: *"matth...@ververica.com" 
> *Cc: *user , Chesnay Schepler 
> *Subject: *Re: Flink application kept restarting
>
>
>
> Hi Matthias,
>
>
>
> Do you have any suggestions to handle timeout issues when fetching data
> from a Kafka topic?
>
> I am thinking of adding a retry logic into flink job, not sure if this is
> the right direction.
>
>
>
> Thanks again
>
> Best regards
>
> Rainie
>
>
>
> On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl 
> wrote:
>
> Hi Rainie,
>
> in general buffer pools being destroyed usually mean that some other
> exception occurred that caused the task to fail and in the process of
> failure handling the operator-related network buffer is destroyed. That
> causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
> case. It looks like you had some timeout problem while fetching data from a
> Kafka topic.
>
>
>
> Matthias
>
>
>
> On Tue, Mar 2, 2021 at 10:39 AM Rainie Li  wrote:
>
> Thanks for checking, Matthias.
>
>
>
> I have another flink job which failed last weekend with the same buffer
> pool destroyed error. This job is also running version 1.9.
>
> Here is the error I found from the task manager log. Any suggestion what
> is the root cause and how to fix it?
>
>
>
> 2021-02-28 00:54:45,943 WARN
>  org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> while canceling task.
> java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> a

Re: Flink application kept restarting

2021-03-04 Thread Rainie Li
Hi Matthias,

Do you have any suggestions to handle timeout issues when fetching data
from a Kafka topic?
I am thinking of adding a retry logic into flink job, not sure if this is
the right direction.

Thanks again
Best regards
Rainie

On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl 
wrote:

> Hi Rainie,
> in general buffer pools being destroyed usually mean that some other
> exception occurred that caused the task to fail and in the process of
> failure handling the operator-related network buffer is destroyed. That
> causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
> case. It looks like you had some timeout problem while fetching data from a
> Kafka topic.
>
> Matthias
>
> On Tue, Mar 2, 2021 at 10:39 AM Rainie Li  wrote:
>
>> Thanks for checking, Matthias.
>>
>> I have another flink job which failed last weekend with the same buffer
>> pool destroyed error. This job is also running version 1.9.
>> Here is the error I found from the task manager log. Any suggestion what
>> is the root cause and how to fix it?
>>
>> 2021-02-28 00:54:45,943 WARN
>>  org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
>> while canceling task.
>> java.lang.RuntimeException: Buffer pool is destroyed.
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)

Re: Flink application kept restarting

2021-03-03 Thread Rainie Li
I see.
Thank you for the explanation.

Best regards
Rainie

On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl 
wrote:

> Hi Rainie,
> in general buffer pools being destroyed usually mean that some other
> exception occurred that caused the task to fail and in the process of
> failure handling the operator-related network buffer is destroyed. That
> causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
> case. It looks like you had some timeout problem while fetching data from a
> Kafka topic.
>
> Matthias
>
> On Tue, Mar 2, 2021 at 10:39 AM Rainie Li  wrote:
>
>> Thanks for checking, Matthias.
>>
>> I have another flink job which failed last weekend with the same buffer
>> pool destroyed error. This job is also running version 1.9.
>> Here is the error I found from the task manager log. Any suggestion what
>> is the root cause and how to fix it?
>>
>> 2021-02-28 00:54:45,943 WARN
>>  org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
>> while canceling task.
>> java.lang.RuntimeException: Buffer pool is destroyed.
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.

Re: Flink application kept restarting

2021-03-02 Thread Rainie Li
(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired
while fetching topic metadata


Thanks again!
Best regards
Rainie

On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl  wrote:

> Another question is: The timeout of 48 hours sounds strange. There should
> have been some other system noticing the connection problem earlier
> assuming that you have a reasonably low heartbeat interval configured.
>
> Matthias
>
> On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl 
> wrote:
>
>> Thanks for providing this information, Rainie. Are other issues
>> documented in the logs besides the TimeoutException in the JM logs which
>> you already shared? For now, it looks like that there was a connection
>> problem between the TaskManager and the JobManager that caused the shutdown
>> of the operator resulting in the NetworkBufferPool to be destroyed. For
>> this scenario I would expect other failures to occur besides the ones you
>> shared.
>>
>> Best,
>> Matthias
>>
>> On Fri, Feb 26, 2021 at 8:28 PM Rainie Li  wrote:
>>
>>> Thank you Mattias.
>>> It’s version1.9.
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl 
>>> wrote:
>>>
>>>> Hi Rainie,
>>>> the network buffer pool was destroyed for some reason. This happens
>>>> when the NettyShuffleEnvironment gets closed which is triggered when an
>>>> operator is cleaned up, for instance. Maybe, the timeout in the metric
>>>> system caused this. But I'm not sure how this is connected. I'm gonna add
>>>> Chesnay to this conversation hoping that he can give more insights.
>>>>
>>>> If I may ask: What Flink version are you using?
>>>>
>>>> Thanks,
>>>> Matthias
>>>>
>>>>
>>>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Our flink application kept restarting and it did lots of RPC calls to
>>>>> a dependency service.
>>>>

Re: Flink application kept restarting

2021-02-26 Thread Rainie Li
Thank you Mattias.
It’s version1.9.

Best regards
Rainie

On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl 
wrote:

> Hi Rainie,
> the network buffer pool was destroyed for some reason. This happens when
> the NettyShuffleEnvironment gets closed which is triggered when an operator
> is cleaned up, for instance. Maybe, the timeout in the metric system caused
> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
> conversation hoping that he can give more insights.
>
> If I may ask: What Flink version are you using?
>
> Thanks,
> Matthias
>
>
> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li  wrote:
>
>> Hi All,
>>
>> Our flink application kept restarting and it did lots of RPC calls to a
>> dependency service.
>>
>> *We saw this exception from failed task manager log: *
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at
>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>> at
>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>> at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operato

Flink application kept restarting

2021-02-25 Thread Rainie Li
Hi All,

Our flink application kept restarting and it did lots of RPC calls to a
dependency service.

*We saw this exception from failed task manager log: *
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
at
com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
at
com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 23 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
at

Re: Flink job finished unexpected

2021-02-24 Thread Rainie Li
I see, I will check tm log.
Thank you Arvid.

Best regards
Rainie

On Wed, Feb 24, 2021 at 5:27 AM Arvid Heise  wrote:

> Hi Rainie,
>
> there are two probably causes:
> * Network instabilities
> * Taskmanager died, then you can further dig in the taskmanager logs for
> errors right before that time.
>
> In both cases, Flink should restart the job with the correct restart
> policies if configured.
>
> On Sat, Feb 20, 2021 at 10:07 PM Rainie Li  wrote:
>
>> Hello,
>>
>> I launched a job with a larger load on hadoop yarn cluster.
>> The Job finished after running 5 hours, I didn't find any error from
>> JobManger log besides this connect exception.
>>
>>
>>
>>
>>
>> *2021-02-20 13:20:14,110 WARN  akka.remote.transport.netty.NettyTransport
>>- Remote connection to [/10.1.57.146:48368
>> <http://10.1.57.146:48368>] failed with java.io.IOException: Connection
>> reset by peer2021-02-20 13:20:14,110 WARN
>>  akka.remote.ReliableDeliverySupervisor-
>> Association with remote system [akka.tcp://flink-metrics@host:35241] has
>> failed, address is now gated for [50] ms. Reason: [Disassociated]
>> 2021-02-20 13:20:14,110 WARN  akka.remote.ReliableDeliverySupervisor
>>  - Association with remote system
>> [akka.tcp://flink@host:39493] has failed, address is now gated for [50] ms.
>> Reason: [Disassociated] 2021-02-20 13:20:14,110 WARN
>>  akka.remote.ReliableDeliverySupervisor-
>> Association with remote system [akka.tcp://flink-metrics@host:38481] has
>> failed, address is now gated for [50] ms. Reason: [Disassociated] *
>>
>> Any idea what caused the job to be finished and how to resolve it?
>> Any suggestions are appreciated.
>>
>> Thanks
>> Best regards
>> Rainie
>>
>


Flink job finished unexpected

2021-02-20 Thread Rainie Li
Hello,

I launched a job with a larger load on hadoop yarn cluster.
The Job finished after running 5 hours, I didn't find any error from
JobManger log besides this connect exception.





*2021-02-20 13:20:14,110 WARN  akka.remote.transport.netty.NettyTransport
 - Remote connection to [/10.1.57.146:48368
] failed with java.io.IOException: Connection
reset by peer2021-02-20 13:20:14,110 WARN
 akka.remote.ReliableDeliverySupervisor-
Association with remote system [akka.tcp://flink-metrics@host:35241] has
failed, address is now gated for [50] ms. Reason: [Disassociated]
2021-02-20 13:20:14,110 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system
[akka.tcp://flink@host:39493] has failed, address is now gated for [50] ms.
Reason: [Disassociated] 2021-02-20 13:20:14,110 WARN
 akka.remote.ReliableDeliverySupervisor-
Association with remote system [akka.tcp://flink-metrics@host:38481] has
failed, address is now gated for [50] ms. Reason: [Disassociated] *

Any idea what caused the job to be finished and how to resolve it?
Any suggestions are appreciated.

Thanks
Best regards
Rainie


Re: Flink app cannot restart

2020-07-23 Thread Rainie Li
1,330 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
Custom Source  (1/60) switched from RUNNING to CANCELING.

Best regards
Rainie

On Wed, Jul 22, 2020 at 7:19 PM Yang Wang  wrote:

> Could you check for that whether the JobManager is also running on the
> lost Yarn NodeManager?
> If it is the case, you need to configure "yarn.application-attempts" to a
> value bigger than 1.
>
>
> BTW, the logs you provided are not Yarn NodeManager logs. And if you could
> provide the full jobmanager
> log, it will help a lot.
>
>
>
> Best,
> Yang
>
> Rainie Li  于2020年7月22日周三 下午3:54写道:
>
>> Hi Flink help,
>>
>> I am new to Flink.
>> I am investigating one flink app that cannot restart when we lose yarn
>> node manager (tc.yarn.rm.cluster.NumActiveNMs=0), while other flink apps
>> can restart automatically.
>>
>> *Here is job's restartPolicy setting:*
>>
>> *env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
>> org.apache.flink.api.common.time.Time.seconds(30)));*
>>
>> *Here is Job Manager log:*
>>
>> 2020-07-15 20:26:27,831 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job switched 
>> from state RUNNING to FAILING.
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>  Connection unexpectedly closed by remote task manager. This might indicate 
>> that the remote task manager was lost.
>>
>> at 
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
>>
>> at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> *Here is some yarn node manager log:*
>>
>> 2020-07-15 20:57:11.927858: I tensorflow/cc/saved_model/reader.cc:31] 
>> Reading SavedModel from
>>
>> 2020-07-15 20:57:11.928419: I tensorflow/cc/saved_model/reader.cc:54] 
>> Reading meta graph with tags
>>
>> 2020-07-15 20:57:11.928923: I 
>> tensorflow/core/platf

Flink APP restart policy not working

2020-07-22 Thread Rainie Li
各位大佬好,
本人Flink新手上路,想咨询一下有时候Flink App 设置了restartPolicy 但是还是restart不了,这种情况怎么破?

*Job’s restartPolicy:*

*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
org.apache.flink.api.common.time.Time.seconds(30)));*

*Job Manager log:*

2020-07-15 20:26:27,831 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
switched from state RUNNING to FAILING.

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager. This might
indicate that the remote task manager was lost.

at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)

at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)

at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)

at java.lang.Thread.run(Thread.java:748)


*yarn node manager log:*

2020-07-15 20:57:11.927858: I tensorflow/cc/saved_model/reader.cc:31]
Reading SavedModel from

2020-07-15 20:57:11.928419: I tensorflow/cc/saved_model/reader.cc:54]
Reading meta graph with tags

2020-07-15 20:57:11.928923: I
tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports
instructions that this TensorFlow binary was not compiled to use:
SSE4.1 SSE4.2 AVX AVX2 FMA

2020-07-15 20:57:11.935924: I tensorflow/cc/saved_model/loader.cc:162]
Restoring SavedModel bundle.

2020-07-15 20:57:11.939271: I tensorflow/cc/saved_model/loader.cc:138]
Running MainOp with key saved_model_main_op on SavedModel bundle.

2020-07-15 20:57:11.944583: I tensorflow/cc/saved_model/loader.cc:259]
SavedModel load for tags; Status: success. Took 16732 microseconds.

2020-07-15 20:58:13.356004: F
tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot
register 2 metrics with the same name:
/tensorflow/cc/saved_model/load_attempt_count


多谢
Rainie


Flink app cannot restart

2020-07-22 Thread Rainie Li
Hi Flink help,

I am new to Flink.
I am investigating one flink app that cannot restart when we lose yarn node
manager (tc.yarn.rm.cluster.NumActiveNMs=0), while other flink apps can
restart automatically.

*Here is job's restartPolicy setting:*

*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
org.apache.flink.api.common.time.Time.seconds(30)));*

*Here is Job Manager log:*

2020-07-15 20:26:27,831 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
switched from state RUNNING to FAILING.

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager. This might
indicate that the remote task manager was lost.

at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)

at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1429)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)

at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:947)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)

at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)

at java.lang.Thread.run(Thread.java:748)


*Here is some yarn node manager log:*

2020-07-15 20:57:11.927858: I tensorflow/cc/saved_model/reader.cc:31]
Reading SavedModel from

2020-07-15 20:57:11.928419: I tensorflow/cc/saved_model/reader.cc:54]
Reading meta graph with tags

2020-07-15 20:57:11.928923: I
tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports
instructions that this TensorFlow binary was not compiled to use:
SSE4.1 SSE4.2 AVX AVX2 FMA

2020-07-15 20:57:11.935924: I tensorflow/cc/saved_model/loader.cc:162]
Restoring SavedModel bundle.

2020-07-15 20:57:11.939271: I tensorflow/cc/saved_model/loader.cc:138]
Running MainOp with key saved_model_main_op on SavedModel bundle.

2020-07-15 20:57:11.944583: I tensorflow/cc/saved_model/loader.cc:259]
SavedModel load for tags; Status: success. Took 16732 microseconds.

2020-07-15 20:58:13.356004: F
tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot
register 2 metrics with the same name:
/tensorflow/cc/saved_model/load_attempt_count


Any idea why this app's restartPolicy doesn't work?
Thanks
Best regards
Rainie


Re: Flink yarn session exception

2020-07-16 Thread Rainie Li
好搭,谢谢!

On Thu, Jul 16, 2020 at 5:32 PM 忝忝向仧 <153488...@qq.com> wrote:

> 你可以看看lib里面的包跟官网的要求是不是一样的
>
>
>
> 发自我的iPhone
>
>
> -- Original ------
> From: Rainie Li  Date: Fri,Jul 17,2020 1:06 AM
> To: user-zh  Subject: Re: Flink yarn session exception
>
>
>
> 多谢,我set了这些envs:
>
> export JAVA_HOME=/usr/lib/jvm/java-8-oracle
> export PATH=$JAVA_HOME/bin:$PATH
> export HADOOP_HOME=/usr/local/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
> export HADOOP_CLASSPATH=`hadoop classpath`
> export FLINK_CONF_DIR=/etc/flink-1.9.1/conf
> export FLINK_LOG_DIR=/home/rainieli/
>
> 有什么问题吗?
>
>
> On Thu, Jul 16, 2020 at 1:12 AM Paul Lam 
>  日志里说得比较清楚了,classpath 里没有 Hadoop 的 lib。可以参考这个文档 [1] 来配置你的环境。
> 
>  1.
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
>  <
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
>  
>  Best,
>  Paul Lam
> 
>   2020年7月16日 15:46,Rainie Li  写道:
>  
>   大佬们好,我是flink新手,正在用flink 1.9.1
>   Flink APP cannot run, APP log error, 想求教一下会是什么原因造成的,多谢
>  
>   2020-06-16 17:06:21,921 WARN
> org.apache.flink.client.cli.CliFrontend
> 
> 
> - Could not load CLI class
>   org.apache.flink.yarn.cli.FlinkYarnSessionCli.
>   java.lang.NoClassDefFoundError:
>   org/apache/hadoop/yarn/exceptions/YarnException
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:264)
>   at
>  
> 
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
>   at
>  
> 
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
>   at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
>   Caused by: java.lang.ClassNotFoundException:
>   org.apache.hadoop.yarn.exceptions.YarnException
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 5 more
>   2020-06-16 17:06:21,980 INFO
> org.apache.flink.core.fs.FileSystem
> 
> 
> - Hadoop is not in the classpath/dependencies. The
>  extended
>   set of supported File Systems via Hadoop is not available.
> 
> 


Flink yarn session exception

2020-07-16 Thread Rainie Li
大佬们好,我是flink新手,正在用flink 1.9.1
Flink APP cannot run, APP log error,  想求教一下会是什么原因造成的,多谢

2020-06-16 17:06:21,921 WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more
2020-06-16 17:06:21,980 INFO  org.apache.flink.core.fs.FileSystem
- Hadoop is not in the classpath/dependencies. The extended
set of supported File Systems via Hadoop is not available.


Re: flink app crashed

2020-07-15 Thread Rainie Li
: null
... (some serverset info here)

Thanks
Best regards
Rainie

On Wed, Jul 15, 2020 at 12:45 PM Rainie Li  wrote:

> Thank you, Jesse.
>
> Here are more log info:
>
> 2020-07-15 18:19:36,456 INFO  org.apache.flink.client.cli.CliFrontend
>   -
> 
> 2020-07-15 18:19:36,460 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, localhost
> 2020-07-15 18:19:36,460 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-07-15 18:19:36,460 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.heap.size, 1024m
> 2020-07-15 18:19:36,460 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.heap.size, 1024m
> 2020-07-15 18:19:36,460 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-07-15 18:19:36,460 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: parallelism.default, 1
> 2020-07-15 18:19:36,461 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-07-15 18:19:36,463 WARN  org.apache.flink.client.cli.CliFrontend
>   - Could not load CLI class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> java.lang.NoClassDefFoundError:
> org/apache/hadoop/yarn/exceptions/YarnException
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.yarn.exceptions.YarnException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 5 more
> 2020-07-15 18:19:36,519 INFO  org.apache.flink.core.fs.FileSystem
>   - Hadoop is not in the classpath/dependencies. The
> extended set of supported File Systems via Hadoop is not availab\
> le.
> 2020-07-15 18:19:36,647 INFO
>  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
> create Hadoop Security Module because Hadoop cannot be found in the
> Classpath.
> 2020-07-15 18:19:36,658 INFO
>  org.apache.flink.runtime.security.SecurityUtils   - Cannot
> install HadoopSecurityContext because Hadoop cannot be found in the
> Classpath.
>
>
> Best regards
> Rainie
>
> On Wed, Jul 15, 2020 at 11:49 AM Jesse Lord  wrote:
>
>> Hi Rainie,
>>
>>
>>
>> I am relatively new to flink, but I suspect that your error is somewhere
>> else in the log. I have found most of my problems by doing a search for the
>> word “error” or “exception”. Since all of these log lines are at the info
>> level, they are probably not highlighting any real issues. If you send more
>> of the log or find an error line that might help others debug.
>>
>>
>>
>> Thanks,
>>
>> Jesse
>>
>>
>>
>> *From: *Rainie Li 
>> *Date: *Wednesday, July 15, 2020 at 10:54 AM
>> *To: *"user@flink.apache.org" 
>> *Subject: *flink app crashed
>>
>>
>>
>> Hi All,
>>
>>
>>
>> I am new to Flink, any idea why flink app's Job Manager stuck, here is
>> bottom part from the Job Manager log. Any suggestion will be appreciated.
>>
>> 2020-07-15 16:49:52,749 INFO
>> org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint
>> for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
>> akka://flink/user/dispatcher .
>>
>> 2020-07-15 16:49:52,759 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>
>> 2020-07-15 16:49:52,759 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Starting ZooKeeperLead

Re: flink app crashed

2020-07-15 Thread Rainie Li
Thank you, Jesse.

Here are more log info:

2020-07-15 18:19:36,456 INFO  org.apache.flink.client.cli.CliFrontend
-

2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, localhost
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.size, 1024m
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.size, 1024m
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-07-15 18:19:36,460 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2020-07-15 18:19:36,461 INFO
 org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-07-15 18:19:36,463 WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more
2020-07-15 18:19:36,519 INFO  org.apache.flink.core.fs.FileSystem
- Hadoop is not in the classpath/dependencies. The extended
set of supported File Systems via Hadoop is not availab\
le.
2020-07-15 18:19:36,647 INFO
 org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
create Hadoop Security Module because Hadoop cannot be found in the
Classpath.
2020-07-15 18:19:36,658 INFO
 org.apache.flink.runtime.security.SecurityUtils   - Cannot
install HadoopSecurityContext because Hadoop cannot be found in the
Classpath.


Best regards
Rainie

On Wed, Jul 15, 2020 at 11:49 AM Jesse Lord  wrote:

> Hi Rainie,
>
>
>
> I am relatively new to flink, but I suspect that your error is somewhere
> else in the log. I have found most of my problems by doing a search for the
> word “error” or “exception”. Since all of these log lines are at the info
> level, they are probably not highlighting any real issues. If you send more
> of the log or find an error line that might help others debug.
>
>
>
> Thanks,
>
> Jesse
>
>
>
> *From: *Rainie Li 
> *Date: *Wednesday, July 15, 2020 at 10:54 AM
> *To: *"user@flink.apache.org" 
> *Subject: *flink app crashed
>
>
>
> Hi All,
>
>
>
> I am new to Flink, any idea why flink app's Job Manager stuck, here is
> bottom part from the Job Manager log. Any suggestion will be appreciated.
>
> 2020-07-15 16:49:52,749 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint
> for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/dispatcher .
>
> 2020-07-15 16:49:52,759 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>
> 2020-07-15 16:49:52,759 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
>
> 2020-07-15 16:49:52,762 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
>
> 2020-07-15 16:49:52,790 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Dispatcher
> /user/dispatcher was granted leadership with fencing token
>
> 2020-07-15 16:49:52,791 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Recovering all
> persisted jobs.
>
> 2020-07-15 16:49:52,931 INFO
> org.

flink app crashed

2020-07-15 Thread Rainie Li
Hi All,

I am new to Flink, any idea why flink app's Job Manager stuck, here is
bottom part from the Job Manager log. Any suggestion will be appreciated.
2020-07-15 16:49:52,749 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService - Starting RPC endpoint
for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/dispatcher .
2020-07-15 16:49:52,759 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2020-07-15 16:49:52,759 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2020-07-15 16:49:52,762 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
Starting ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
2020-07-15 16:49:52,790 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Dispatcher
/user/dispatcher was granted leadership with fencing token
2020-07-15 16:49:52,791 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Recovering all
persisted jobs.
2020-07-15 16:49:52,931 INFO
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing
over to rm1
2020-07-15 16:49:53,014 INFO org.apache.flink.yarn.YarnResourceManager -
Recovered 0 containers from previous attempts ([]).
2020-07-15 16:49:53,018 INFO
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl - Upper
bound of the thread pool size is 500
2020-07-15 16:49:53,020 INFO
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy -
yarn.client.max-cached-nodemanagers-proxies : 0
2020-07-15 16:49:53,021 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
Starting ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
2020-07-15 16:49:53,042 INFO org.apache.flink.yarn.YarnResourceManager -
ResourceManager akka.tcp://flink@cluster-dev-001/user/resourcemanager was
granted leadership with fencing token
2020-07-15 16:49:53,046 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl -
Starting the SlotManager.
2020-07-15 16:50:52,217 INFO org.apache.kafka.clients.Metadata - Cluster
ID: FZrfSqHiTpaZwEzIRYkCLQ


Thanks
Best regards
Rainie