Hi Joshua,

sorry for getting back to you so late. Personally, I haven't seen this
problem before. Without more log context I think I won't be able to help
you. This looks a bit more like an Akka problem than a Flink problem to be

One cause could be that akka.remote.flush-wait-on-shutdown is set too low.
But this should only happen when you shut down the remote ActorSystem (JM
ActorSystem) and you should see "Shutdown finished, but flushing might not
have been successful and some messages might have been dropped. Increase
akka.remote.flush-wait-on-shutdown to a larger value to avoid this." in the

I don't know which two ports you are referring to for the agent. I think it
would help to share also the logs of your agent.


On Mon, Nov 19, 2018 at 1:49 PM Joshua Fan <joshuafat...@gmail.com> wrote:

> Hi, Till and users,
> There is a weird behavior in actorSystem shutdown in akka of our flink
> platform.
> We use flink 1.4.2 on yarn as our flink deploy mode, and we use an ongoing
> agent to submit flink job to yarn which is based on YarnClient. User can
> connect to the agent to submit job and disconnect, but the agent is always
> there. So, each time the user submit a job there would be a ActorSystem
> created, after the job submitted in detached mode successfully, the
> ActorSystem would be shutdown.
> The weird thing is that there always an akka error message turn out in jm
> log after 2 days( 2 day is the default value in akka of
> quarantine-after-silence), like below.
> 2018-11-19 09:30:34.212 [flink-akka.actor.default-dispatcher-2] ERROR
> akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-5 -
> Association to [akka.tcp://fl...@client01v.xxx:35767] with UID
> [-1757115446] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> In the above, the client01v*** is the host node where runs the agent, and
> the above error turns out randomly. We trigger a savepoint in the agent
> every half hour, it means the actorSystem will be created and shutdown
> accordingly. But only 1 of 50 chance  the shutdown will raise a error like
> above.
> I think maybe it refer to the akka system. I checked the akka code, found
> some clues as below.
> for those there is no error raised in two days, the log in jm like this:
> 2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG
> akka.remote.transport.ProtocolStateActor
> flink-akka.remote.default-remote-dispatcher-23 - Association between local
> [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:56906] was
> disassociated because the ProtocolStateActor failed: Shutdown
> 2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG
> akka.remote.transport.ProtocolStateActor
> flink-akka.remote.default-remote-dispatcher-23 - Association between local
> [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:56906] was
> disassociated because the ProtocolStateActor failed: Shutdown
> 2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG
> akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 -
> Remote system with address [akka.tcp://flink@xxxx:41769] has shut down.
> Address is now gated for 5000 ms, all messages to this address will be
> delivered to dead letters.
> 2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG
> akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 -
> Remote system with address [akka.tcp://flink@xxxx:41769] has shut down.
> Address is now gated for 5000 ms, all messages to this address will be
> delivered to dead letters.
> It seems the remote actor receives the shutdown proposal, the akka message
> may flow like below:
> 1.The agent shut down the actorSystem
> 2.The EndpointReader in jm  receives an AssociationHandle. Shutdown  and
> EndpointReader just throws it as a ShutDownAssociation, and the
> EndpointWriter will publishAndthrow the ShutDownAssociation again.
> 2.when the ReliableDeliverySupervisor in jm gets an AssociationProblem
> reported by the EndpointWriter, it also throw it out.
> 3.when the EndpointManager in jm gets the ShutDownAssociation exception,
> the EndpointManager would stop the actor.
> but for the one which will raised the silent error , the log in jm like
> this, seems the remote actor did not receives the shutdown proposal:
> 2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG
> akka.remote.transport.ProtocolStateActor
> flink-akka.remote.default-remote-dispatcher-14 - Association between local
> [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:45103] was
> disassociated because the ProtocolStateActor failed: Unknown
> 2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG
> akka.remote.transport.ProtocolStateActor
> flink-akka.remote.default-remote-dispatcher-14 - Association between local
> [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:45103] was
> disassociated because the ProtocolStateActor failed: Unknown
> 2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-akka.remote.default-remote-dispatcher-20 - Association with remote
> system [akka.tcp://flink@xxxx:35767] has failed, address is now gated for
> [5000] ms. Reason: [Disassociated]
> 2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-akka.remote.default-remote-dispatcher-20 - Association with remote
> system [akka.tcp://flink@xxxx:35767] has failed, address is now gated for
> [5000] ms. Reason: [Disassociated]
> The akka message may flow like below, I guess:
> 1.The agent shut down the actorSystem
> 2.The EndpointReader in jm receives an AssociationHandle.Unknown instead
> of AssociationHandle.Shutdown, so the EndpointReader stop, and the
> EndpointWriter will get a Terminated message and throws an
> EndpointDisassociatedException,
> 3.and the  ReliableDeliverySupervisor treats the
> EndpointDisassociatedException as a NonFatal exception, and it  do
> something and eventually stop, but I think it may not stop.
> See the code here:
> case NonFatal(e) ⇒
>       val causedBy = if (e.getCause == null) "" else s"Caused by:
> [${e.getCause.getMessage}]"
>       log.warning(
>         "Association with remote system [{}] has failed, address is now
> gated for [{}] ms. Reason: [{}] {}",
>         remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage,
> causedBy)
>       uidConfirmed = false // Need confirmation of UID again
>       if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty)
> && bailoutAt.isEmpty)
>         bailoutAt = Some(Deadline.now +
> settings.InitialSysMsgDeliveryTimeout)
>       context.become(gated(writerTerminated = false, earlyUngateRequested
> = false))
>       currentHandle = None
>       context.parent ! StoppedReading(self)
>       Stop
> but the silent error message should be raised as:
> gotoIdle:
> private def goToIdle(): Unit = {
>     if (maxSilenceTimer.isEmpty)
>       maxSilenceTimer =
> Some(context.system.scheduler.scheduleOnce(settings.QuarantineSilentSystemTimeout,
> self, TooLongIdle))
>     context.become(idle)
>   }
> and after two days:
>  def idle: Receive = {
>     case IsIdle ⇒ sender() ! Idle
>     case s: Send ⇒
>       writer = createWriter()
>       // Resending will be triggered by the incoming GotUid message after
> the connection finished
>       handleSend(s)
>       goToActive()
>     case AttemptSysMsgRedelivery ⇒
>       if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) {
>         writer = createWriter()
>         // Resending will be triggered by the incoming GotUid message
> after the connection finished
>         goToActive()
>       }
>     case TooLongIdle ⇒
>       throw new HopelessAssociation(localAddress, remoteAddress, uid,
>         new TimeoutException("Remote system has been silent for too long.
> " +
>           s"(more than
> ${settings.QuarantineSilentSystemTimeout.toUnit(TimeUnit.HOURS)} hours)"))
>     case EndpointWriter.FlushAndStop ⇒ context.stop(self)
>     case EndpointWriter.StopReading(w, replyTo) ⇒
>       replyTo ! EndpointWriter.StoppedReading(w)
>     case Ungate ⇒ // ok, not gated
>   }
> You can see the TooLongIdle  can raise the error message in the end. So,
> maybe the actor should stop but  goes to the idle state somehow.
> OK, I try my best to explain the message process, and I hope I did it.
> Here is my question:
> 1.why most of the shutdown is OK but the other raised a error?
> 2.Is there two connection between the actors, one for data and the other
> one for system message? Because as the log shows, there are two ports in
> the agent side.
> Thank you for your patience here ,it is so long a post.
> If I didn't make the situation clear enough, please let me know.
> Thank you all.
> Yours
> Joshua

