[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14293502#comment-14293502 ] ASF GitHub Bot commented on FLINK-1352: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71647585 I'll merge it. Buggy registration from TaskManager to JobManager - Key: FLINK-1352 URL: https://issues.apache.org/jira/browse/FLINK-1352 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 The JobManager's InstanceManager may refuse the registration attempt from a TaskManager, because it has this taskmanager already connected, or,in the future, because the TaskManager has been blacklisted as unreliable. Unpon refused registration, the instance ID is null, to signal that refused registration. TaskManager reacts incorrectly to such methods, assuming successful registration Possible solution: JobManager sends back a dedicated RegistrationRefused message, if the instance manager returns null as the registration result. If the TastManager receives that before being registered, it knows that the registration response was lost (which should not happen on TCP and it would indicate a corrupt connection) Followup question: Does it make sense to have the TaskManager trying indefinitely to connect to the JobManager. With increasing interval (from seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291940#comment-14291940 ] ASF GitHub Bot commented on FLINK-1352: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71476676 I updated the PR with the exponential backoff registration strategy. On the way, I fixed the flakey RecoveryIT case. Buggy registration from TaskManager to JobManager - Key: FLINK-1352 URL: https://issues.apache.org/jira/browse/FLINK-1352 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 The JobManager's InstanceManager may refuse the registration attempt from a TaskManager, because it has this taskmanager already connected, or,in the future, because the TaskManager has been blacklisted as unreliable. Unpon refused registration, the instance ID is null, to signal that refused registration. TaskManager reacts incorrectly to such methods, assuming successful registration Possible solution: JobManager sends back a dedicated RegistrationRefused message, if the instance manager returns null as the registration result. If the TastManager receives that before being registered, it knows that the registration response was lost (which should not happen on TCP and it would indicate a corrupt connection) Followup question: Does it make sense to have the TaskManager trying indefinitely to connect to the JobManager. With increasing interval (from seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291708#comment-14291708 ] ASF GitHub Bot commented on FLINK-1352: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23523674 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { -registrationAttempts = 0 -import context.dispatcher -registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) +registrationDuration = 0 seconds + +registered = false + +context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager = { - registrationAttempts += 1 + if(!registered) { +registrationDuration += registrationDelay +// double delay for exponential backoff +registrationDelay *= 2 - if (registered) { -registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts = TaskManager.MAX_REGISTRATION_ATTEMPTS) { +if (registrationDuration maxRegistrationDuration) { + log.warning(TaskManager could not register at JobManager {} after {}., jobManagerAkkaURL, -log.info(Try to register at master {}. Attempt #{}, jobManagerAkkaURL, - registrationAttempts) -val jobManager = context.actorSelection(jobManagerAkkaURL) +maxRegistrationDuration) -jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { -log.error(TaskManager could not register at JobManager.); -self ! PoisonPill + self ! PoisonPill +} else if (!registered) { + log.info(sTry to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. + +sAttempt) + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) +} } } case AcknowledgeRegistration(id, blobPort) = { - if (!registered) { + if(!registered) { +finishRegistration(id, blobPort) registered = true -currentJobManager = sender -instanceID = id - -context.watch(currentJobManager) - -log.info(TaskManager successfully registered at JobManager {}., - currentJobManager.path.toString) - -setupNetworkEnvironment() -setupLibraryCacheManager(blobPort) + } else { +if (log.isDebugEnabled) { + log.debug(The TaskManager {} is already registered at the JobManager {}, but received + +another AcknowledgeRegistration message., self.path, currentJobManager.path) +} + } +} -heartbeatScheduler = Some(context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) +case AlreadyRegistered(id, blobPort) = + if(!registered) { +log.warning(The TaskManager {} seems to be already registered at the JobManager {} even + + though it has not yet finished the registration process., self.path, sender.path) -profiler foreach { - _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) +finishRegistration(id, blobPort) +registered = true + } else { +// ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration +if(log.isDebugEnabled){ + log.debug(The TaskManager {} has already been registered at the JobManager {}., +self.path, sender.path) } + } -for (listener - waitForRegistration) { - listener ! RegisteredAtJobManager -} +case RefuseRegistration(reason) = + if(!registered) { +log.error(The registration of task manager {} was refused by the job manager {} + + because {}., self.path, jobManagerAkkaURL, reason) -waitForRegistration.clear() +
[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291703#comment-14291703 ] ASF GitHub Bot commented on FLINK-1352: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23523589 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { -registrationAttempts = 0 -import context.dispatcher -registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) +registrationDuration = 0 seconds + +registered = false + +context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager = { - registrationAttempts += 1 + if(!registered) { +registrationDuration += registrationDelay +// double delay for exponential backoff +registrationDelay *= 2 - if (registered) { -registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts = TaskManager.MAX_REGISTRATION_ATTEMPTS) { +if (registrationDuration maxRegistrationDuration) { + log.warning(TaskManager could not register at JobManager {} after {}., jobManagerAkkaURL, -log.info(Try to register at master {}. Attempt #{}, jobManagerAkkaURL, - registrationAttempts) -val jobManager = context.actorSelection(jobManagerAkkaURL) +maxRegistrationDuration) -jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { -log.error(TaskManager could not register at JobManager.); -self ! PoisonPill + self ! PoisonPill +} else if (!registered) { + log.info(sTry to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. + +sAttempt) + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) +} } } case AcknowledgeRegistration(id, blobPort) = { - if (!registered) { + if(!registered) { +finishRegistration(id, blobPort) registered = true -currentJobManager = sender -instanceID = id - -context.watch(currentJobManager) - -log.info(TaskManager successfully registered at JobManager {}., - currentJobManager.path.toString) - -setupNetworkEnvironment() -setupLibraryCacheManager(blobPort) + } else { +if (log.isDebugEnabled) { --- End diff -- You're right Henry, I'll remove it. Buggy registration from TaskManager to JobManager - Key: FLINK-1352 URL: https://issues.apache.org/jira/browse/FLINK-1352 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 The JobManager's InstanceManager may refuse the registration attempt from a TaskManager, because it has this taskmanager already connected, or,in the future, because the TaskManager has been blacklisted as unreliable. Unpon refused registration, the instance ID is null, to signal that refused registration. TaskManager reacts incorrectly to such methods, assuming successful registration Possible solution: JobManager sends back a dedicated RegistrationRefused message, if the instance manager returns null as the registration result. If the TastManager receives that before being registered, it knows that the registration response was lost (which should not happen on TCP and it would indicate a corrupt connection) Followup question: Does it make sense to have the TaskManager trying indefinitely to connect to the JobManager. With increasing interval (from seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289085#comment-14289085 ] ASF GitHub Bot commented on FLINK-1352: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71177177 Yeah you're right that the exponential backoff was the default behaviour before. I think Stephan's proposal is the best solution. I'll implement it and also change the Akka messages so that in the future TaskManager can be refused by the JobManager, if needed. Buggy registration from TaskManager to JobManager - Key: FLINK-1352 URL: https://issues.apache.org/jira/browse/FLINK-1352 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 The JobManager's InstanceManager may refuse the registration attempt from a TaskManager, because it has this taskmanager already connected, or,in the future, because the TaskManager has been blacklisted as unreliable. Unpon refused registration, the instance ID is null, to signal that refused registration. TaskManager reacts incorrectly to such methods, assuming successful registration Possible solution: JobManager sends back a dedicated RegistrationRefused message, if the instance manager returns null as the registration result. If the TastManager receives that before being registered, it knows that the registration response was lost (which should not happen on TCP and it would indicate a corrupt connection) Followup question: Does it make sense to have the TaskManager trying indefinitely to connect to the JobManager. With increasing interval (from seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14287259#comment-14287259 ] ASF GitHub Bot commented on FLINK-1352: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71002640 You are right @hsaputra, because I'm not sure which approach is the best. In the corresponding JIRA issue I have tried to give a summary of what I think are the pros and cons of indefinitely many registration tries vs. a limited number of tries and a constant pause in between tries vs. an increasing pause. Indefinitely many registration tries: Pros: If the JobManager becomes available at some point in time, then the TaskManager will definitely connect to it Cons: If the JobManager dies of some reason, then the TaskManager will linger around for all eternity or until it is stopped manually Limited number of tries: Pros: Will terminate itself after some time Cons: The time interval might be too short for the JobManager to get started Constant pause: Pros: Relatively quick response time Cons: Causing network traffic until the JobManager has been started Increasing pause: Pros: Reduction of network traffic if the JobManager takes a little bit longer to start Cons: Might delay the registration process if one interval was just missed Buggy registration from TaskManager to JobManager - Key: FLINK-1352 URL: https://issues.apache.org/jira/browse/FLINK-1352 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 The JobManager's InstanceManager may refuse the registration attempt from a TaskManager, because it has this taskmanager already connected, or,in the future, because the TaskManager has been blacklisted as unreliable. Unpon refused registration, the instance ID is null, to signal that refused registration. TaskManager reacts incorrectly to such methods, assuming successful registration Possible solution: JobManager sends back a dedicated RegistrationRefused message, if the instance manager returns null as the registration result. If the TastManager receives that before being registered, it knows that the registration response was lost (which should not happen on TCP and it would indicate a corrupt connection) Followup question: Does it make sense to have the TaskManager trying indefinitely to connect to the JobManager. With increasing interval (from seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14288569#comment-14288569 ] ASF GitHub Bot commented on FLINK-1352: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71133987 I am not sure that the infinite number of tries is actually bad. This sort of depends on the situation, I guess: - On YARN, it may make sense, because the node will then go back into the pool of available resource - On standalone, it will anyways be there for Flink, so the TaskManager might as well keep trying to offer itself for work. Think of a network partitioning event - after the partitions re-joined, the cluster should work as a whole again. How about the following: We have a config parameter how long nodes should attempt to register. YARN could set a timeout (say 2-5 minutes), while by default, the timeout is infinite. Concerning the attempt pause: Having attempts with exponential backoff (and a cap) is the common thing (and I think it was the default before). Start with a 50ms pause and double it each attempt and cap it at 1 or 2 minutes or so. If you miss early attempts, the pause will not be long. If you missed an all attempts within the first second, you are guaranteed to not wait more than twice as long as you already waited anyways. For the sake of transparency and making sure that the states are actually in sync: How about we have three response messages for the registration attempt: 1. Refused (for whatever reason, the message should have a string that the TM can log) 2. Accepted (with the assigned ID) 3. Already registered (with the assigned ID) - The current logic handles this correctly as well, but this will allow us to log better at the TaskManager and debug problems there much better. Since this is a mechanism which may have weird cornercase behavior, it would be good to know as much about what was happening as possible. Buggy registration from TaskManager to JobManager - Key: FLINK-1352 URL: https://issues.apache.org/jira/browse/FLINK-1352 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 The JobManager's InstanceManager may refuse the registration attempt from a TaskManager, because it has this taskmanager already connected, or,in the future, because the TaskManager has been blacklisted as unreliable. Unpon refused registration, the instance ID is null, to signal that refused registration. TaskManager reacts incorrectly to such methods, assuming successful registration Possible solution: JobManager sends back a dedicated RegistrationRefused message, if the instance manager returns null as the registration result. If the TastManager receives that before being registered, it knows that the registration response was lost (which should not happen on TCP and it would indicate a corrupt connection) Followup question: Does it make sense to have the TaskManager trying indefinitely to connect to the JobManager. With increasing interval (from seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager
[ https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14285733#comment-14285733 ] ASF GitHub Bot commented on FLINK-1352: --- Github user Humbedooh commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-70853008 one last comment, I think it's fixed now :) Buggy registration from TaskManager to JobManager - Key: FLINK-1352 URL: https://issues.apache.org/jira/browse/FLINK-1352 Project: Flink Issue Type: Bug Components: JobManager, TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Till Rohrmann Fix For: 0.9 The JobManager's InstanceManager may refuse the registration attempt from a TaskManager, because it has this taskmanager already connected, or,in the future, because the TaskManager has been blacklisted as unreliable. Unpon refused registration, the instance ID is null, to signal that refused registration. TaskManager reacts incorrectly to such methods, assuming successful registration Possible solution: JobManager sends back a dedicated RegistrationRefused message, if the instance manager returns null as the registration result. If the TastManager receives that before being registered, it knows that the registration response was lost (which should not happen on TCP and it would indicate a corrupt connection) Followup question: Does it make sense to have the TaskManager trying indefinitely to connect to the JobManager. With increasing interval (from seconds to minutes)? -- This message was sent by Atlassian JIRA (v6.3.4#6332)