[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager

2015-01-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14293502#comment-14293502
 ] 

ASF GitHub Bot commented on FLINK-1352:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71647585
  
I'll merge it.


 Buggy registration from TaskManager to JobManager
 -

 Key: FLINK-1352
 URL: https://issues.apache.org/jira/browse/FLINK-1352
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 The JobManager's InstanceManager may refuse the registration attempt from a 
 TaskManager, because it has this taskmanager already connected, or,in the 
 future, because the TaskManager has been blacklisted as unreliable.
 Unpon refused registration, the instance ID is null, to signal that refused 
 registration. TaskManager reacts incorrectly to such methods, assuming 
 successful registration
 Possible solution: JobManager sends back a dedicated RegistrationRefused 
 message, if the instance manager returns null as the registration result. If 
 the TastManager receives that before being registered, it knows that the 
 registration response was lost (which should not happen on TCP and it would 
 indicate a corrupt connection)
 Followup question: Does it make sense to have the TaskManager trying 
 indefinitely to connect to the JobManager. With increasing interval (from 
 seconds to minutes)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager

2015-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291940#comment-14291940
 ] 

ASF GitHub Bot commented on FLINK-1352:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71476676
  
I updated the PR with the exponential backoff registration strategy. On the 
way, I fixed the flakey RecoveryIT case.


 Buggy registration from TaskManager to JobManager
 -

 Key: FLINK-1352
 URL: https://issues.apache.org/jira/browse/FLINK-1352
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 The JobManager's InstanceManager may refuse the registration attempt from a 
 TaskManager, because it has this taskmanager already connected, or,in the 
 future, because the TaskManager has been blacklisted as unreliable.
 Unpon refused registration, the instance ID is null, to signal that refused 
 registration. TaskManager reacts incorrectly to such methods, assuming 
 successful registration
 Possible solution: JobManager sends back a dedicated RegistrationRefused 
 message, if the instance manager returns null as the registration result. If 
 the TastManager receives that before being registered, it knows that the 
 registration response was lost (which should not happen on TCP and it would 
 indicate a corrupt connection)
 Followup question: Does it make sense to have the TaskManager trying 
 indefinitely to connect to the JobManager. With increasing interval (from 
 seconds to minutes)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager

2015-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291708#comment-14291708
 ] 

ASF GitHub Bot commented on FLINK-1352:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/328#discussion_r23523674
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
   }
 
   private def tryJobManagerRegistration(): Unit = {
-registrationAttempts = 0
-import context.dispatcher
-registrationScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
-  self, RegisterAtJobManager))
+registrationDuration = 0 seconds
+
+registered = false
+
+context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
   }
 
   override def receiveWithLogMessages: Receive = {
 case RegisterAtJobManager = {
-  registrationAttempts += 1
+  if(!registered) {
+registrationDuration += registrationDelay
+// double delay for exponential backoff
+registrationDelay *= 2
 
-  if (registered) {
-registrationScheduler.foreach(_.cancel())
-  }
-  else if (registrationAttempts = 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+if (registrationDuration  maxRegistrationDuration) {
+  log.warning(TaskManager could not register at JobManager {} 
after {}., jobManagerAkkaURL,
 
-log.info(Try to register at master {}. Attempt #{}, 
jobManagerAkkaURL,
-  registrationAttempts)
-val jobManager = context.actorSelection(jobManagerAkkaURL)
+maxRegistrationDuration)
 
-jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
-  }
-  else {
-log.error(TaskManager could not register at JobManager.);
-self ! PoisonPill
+  self ! PoisonPill
+} else if (!registered) {
+  log.info(sTry to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}.  +
+sAttempt)
+  val jobManager = context.actorSelection(jobManagerAkkaURL)
+
+  jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
+
+  context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
+}
   }
 }
 
 case AcknowledgeRegistration(id, blobPort) = {
-  if (!registered) {
+  if(!registered) {
+finishRegistration(id, blobPort)
 registered = true
-currentJobManager = sender
-instanceID = id
-
-context.watch(currentJobManager)
-
-log.info(TaskManager successfully registered at JobManager {}.,
-  currentJobManager.path.toString)
-
-setupNetworkEnvironment()
-setupLibraryCacheManager(blobPort)
+  } else {
+if (log.isDebugEnabled) {
+  log.debug(The TaskManager {} is already registered at the 
JobManager {}, but received  +
+another AcknowledgeRegistration message., self.path, 
currentJobManager.path)
+}
+  }
+}
 
-heartbeatScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, 
self, SendHeartbeat))
+case AlreadyRegistered(id, blobPort) =
+  if(!registered) {
+log.warning(The TaskManager {} seems to be already registered at 
the JobManager {} even +
+  though it has not yet finished the registration process., 
self.path, sender.path)
 
-profiler foreach {
-  _.tell(RegisterProfilingListener, 
JobManager.getProfiler(currentJobManager))
+finishRegistration(id, blobPort)
+registered = true
+  } else {
+// ignore AlreadyRegistered messages which arrived after 
AcknowledgeRegistration
+if(log.isDebugEnabled){
+  log.debug(The TaskManager {} has already been registered at the 
JobManager {}.,
+self.path, sender.path)
 }
+  }
 
-for (listener - waitForRegistration) {
-  listener ! RegisteredAtJobManager
-}
+case RefuseRegistration(reason) =
+  if(!registered) {
+log.error(The registration of task manager {} was refused by the 
job manager {}  +
+  because {}., self.path, jobManagerAkkaURL, reason)
 
-waitForRegistration.clear()
+

[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager

2015-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14291703#comment-14291703
 ] 

ASF GitHub Bot commented on FLINK-1352:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/328#discussion_r23523589
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
   }
 
   private def tryJobManagerRegistration(): Unit = {
-registrationAttempts = 0
-import context.dispatcher
-registrationScheduler = Some(context.system.scheduler.schedule(
-  TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
-  self, RegisterAtJobManager))
+registrationDuration = 0 seconds
+
+registered = false
+
+context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
   }
 
   override def receiveWithLogMessages: Receive = {
 case RegisterAtJobManager = {
-  registrationAttempts += 1
+  if(!registered) {
+registrationDuration += registrationDelay
+// double delay for exponential backoff
+registrationDelay *= 2
 
-  if (registered) {
-registrationScheduler.foreach(_.cancel())
-  }
-  else if (registrationAttempts = 
TaskManager.MAX_REGISTRATION_ATTEMPTS) {
+if (registrationDuration  maxRegistrationDuration) {
+  log.warning(TaskManager could not register at JobManager {} 
after {}., jobManagerAkkaURL,
 
-log.info(Try to register at master {}. Attempt #{}, 
jobManagerAkkaURL,
-  registrationAttempts)
-val jobManager = context.actorSelection(jobManagerAkkaURL)
+maxRegistrationDuration)
 
-jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
-  }
-  else {
-log.error(TaskManager could not register at JobManager.);
-self ! PoisonPill
+  self ! PoisonPill
+} else if (!registered) {
+  log.info(sTry to register at master ${jobManagerAkkaURL}. 
${registrationAttempts}.  +
+sAttempt)
+  val jobManager = context.actorSelection(jobManagerAkkaURL)
+
+  jobManager ! RegisterTaskManager(connectionInfo, 
hardwareDescription, numberOfSlots)
+
+  context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
+}
   }
 }
 
 case AcknowledgeRegistration(id, blobPort) = {
-  if (!registered) {
+  if(!registered) {
+finishRegistration(id, blobPort)
 registered = true
-currentJobManager = sender
-instanceID = id
-
-context.watch(currentJobManager)
-
-log.info(TaskManager successfully registered at JobManager {}.,
-  currentJobManager.path.toString)
-
-setupNetworkEnvironment()
-setupLibraryCacheManager(blobPort)
+  } else {
+if (log.isDebugEnabled) {
--- End diff --

You're right Henry, I'll remove it.


 Buggy registration from TaskManager to JobManager
 -

 Key: FLINK-1352
 URL: https://issues.apache.org/jira/browse/FLINK-1352
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 The JobManager's InstanceManager may refuse the registration attempt from a 
 TaskManager, because it has this taskmanager already connected, or,in the 
 future, because the TaskManager has been blacklisted as unreliable.
 Unpon refused registration, the instance ID is null, to signal that refused 
 registration. TaskManager reacts incorrectly to such methods, assuming 
 successful registration
 Possible solution: JobManager sends back a dedicated RegistrationRefused 
 message, if the instance manager returns null as the registration result. If 
 the TastManager receives that before being registered, it knows that the 
 registration response was lost (which should not happen on TCP and it would 
 indicate a corrupt connection)
 Followup question: Does it make sense to have the TaskManager trying 
 indefinitely to connect to the JobManager. With increasing interval (from 
 seconds to minutes)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager

2015-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289085#comment-14289085
 ] 

ASF GitHub Bot commented on FLINK-1352:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71177177
  
Yeah you're right that the exponential backoff was the default behaviour 
before. I think Stephan's proposal is the best solution. I'll implement it and 
also change the Akka messages so that in the future TaskManager can be refused 
by the JobManager, if needed. 


 Buggy registration from TaskManager to JobManager
 -

 Key: FLINK-1352
 URL: https://issues.apache.org/jira/browse/FLINK-1352
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 The JobManager's InstanceManager may refuse the registration attempt from a 
 TaskManager, because it has this taskmanager already connected, or,in the 
 future, because the TaskManager has been blacklisted as unreliable.
 Unpon refused registration, the instance ID is null, to signal that refused 
 registration. TaskManager reacts incorrectly to such methods, assuming 
 successful registration
 Possible solution: JobManager sends back a dedicated RegistrationRefused 
 message, if the instance manager returns null as the registration result. If 
 the TastManager receives that before being registered, it knows that the 
 registration response was lost (which should not happen on TCP and it would 
 indicate a corrupt connection)
 Followup question: Does it make sense to have the TaskManager trying 
 indefinitely to connect to the JobManager. With increasing interval (from 
 seconds to minutes)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager

2015-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14287259#comment-14287259
 ] 

ASF GitHub Bot commented on FLINK-1352:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71002640
  
You are right @hsaputra, because I'm not sure which approach is the best. 
In the corresponding JIRA issue I have tried to give a summary of what I think 
are the pros and cons of indefinitely many registration tries vs. a limited 
number of tries and a constant pause in between tries vs. an increasing pause.

Indefinitely many registration tries:
Pros: If the JobManager becomes available at some point in time, then the 
TaskManager will definitely connect to it
Cons: If the JobManager dies of some reason, then the TaskManager will 
linger around for all eternity or until it is stopped manually

Limited number of tries:
Pros: Will terminate itself after some time
Cons: The time interval might be too short for the JobManager to get started

Constant pause:
Pros: Relatively quick response time
Cons: Causing network traffic until the JobManager has been started

Increasing pause:
Pros: Reduction of network traffic if the JobManager takes a little bit 
longer to start
Cons: Might delay the registration process if one interval was just missed


 Buggy registration from TaskManager to JobManager
 -

 Key: FLINK-1352
 URL: https://issues.apache.org/jira/browse/FLINK-1352
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 The JobManager's InstanceManager may refuse the registration attempt from a 
 TaskManager, because it has this taskmanager already connected, or,in the 
 future, because the TaskManager has been blacklisted as unreliable.
 Unpon refused registration, the instance ID is null, to signal that refused 
 registration. TaskManager reacts incorrectly to such methods, assuming 
 successful registration
 Possible solution: JobManager sends back a dedicated RegistrationRefused 
 message, if the instance manager returns null as the registration result. If 
 the TastManager receives that before being registered, it knows that the 
 registration response was lost (which should not happen on TCP and it would 
 indicate a corrupt connection)
 Followup question: Does it make sense to have the TaskManager trying 
 indefinitely to connect to the JobManager. With increasing interval (from 
 seconds to minutes)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager

2015-01-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14288569#comment-14288569
 ] 

ASF GitHub Bot commented on FLINK-1352:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-71133987
  
I am not sure that the infinite number of tries is actually bad. This sort 
of depends on the situation, I guess:
  - On YARN, it may make sense, because the node will then go back into the 
pool of available resource
  - On standalone, it will anyways be there for Flink, so the TaskManager 
might as well keep trying to offer itself for work. Think of a network 
partitioning event - after the partitions re-joined, the cluster should work as 
a whole again.

How about the following: We have a config parameter how long nodes should 
attempt to register. YARN could set a timeout (say 2-5 minutes), while by 
default, the timeout is infinite.

Concerning the attempt pause: Having attempts with exponential backoff (and 
a cap) is the common thing (and I think it was the default before). Start with 
a 50ms pause and double it each attempt and cap it at 1 or 2 minutes or so. If 
you miss early attempts, the pause will not be long. If you missed an all 
attempts within the first second, you are guaranteed to not wait more than 
twice as long as you already waited anyways.

For the sake of transparency and making sure that the states are actually 
in sync: How about we have three response messages for the registration attempt:
  1. Refused (for whatever reason, the message should have a string that 
the TM can log)
  2. Accepted (with the assigned ID)
  3. Already registered (with the assigned ID) - The current logic handles 
this correctly as well, but this will allow us to log better at the TaskManager 
and debug problems there much better. Since this is a mechanism which may have 
weird cornercase behavior, it would be good to know as much about what was 
happening as possible.



 Buggy registration from TaskManager to JobManager
 -

 Key: FLINK-1352
 URL: https://issues.apache.org/jira/browse/FLINK-1352
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 The JobManager's InstanceManager may refuse the registration attempt from a 
 TaskManager, because it has this taskmanager already connected, or,in the 
 future, because the TaskManager has been blacklisted as unreliable.
 Unpon refused registration, the instance ID is null, to signal that refused 
 registration. TaskManager reacts incorrectly to such methods, assuming 
 successful registration
 Possible solution: JobManager sends back a dedicated RegistrationRefused 
 message, if the instance manager returns null as the registration result. If 
 the TastManager receives that before being registered, it knows that the 
 registration response was lost (which should not happen on TCP and it would 
 indicate a corrupt connection)
 Followup question: Does it make sense to have the TaskManager trying 
 indefinitely to connect to the JobManager. With increasing interval (from 
 seconds to minutes)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1352) Buggy registration from TaskManager to JobManager

2015-01-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14285733#comment-14285733
 ] 

ASF GitHub Bot commented on FLINK-1352:
---

Github user Humbedooh commented on the pull request:

https://github.com/apache/flink/pull/328#issuecomment-70853008
  
one last comment, I think it's fixed now :)


 Buggy registration from TaskManager to JobManager
 -

 Key: FLINK-1352
 URL: https://issues.apache.org/jira/browse/FLINK-1352
 Project: Flink
  Issue Type: Bug
  Components: JobManager, TaskManager
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Till Rohrmann
 Fix For: 0.9


 The JobManager's InstanceManager may refuse the registration attempt from a 
 TaskManager, because it has this taskmanager already connected, or,in the 
 future, because the TaskManager has been blacklisted as unreliable.
 Unpon refused registration, the instance ID is null, to signal that refused 
 registration. TaskManager reacts incorrectly to such methods, assuming 
 successful registration
 Possible solution: JobManager sends back a dedicated RegistrationRefused 
 message, if the instance manager returns null as the registration result. If 
 the TastManager receives that before being registered, it knows that the 
 registration response was lost (which should not happen on TCP and it would 
 indicate a corrupt connection)
 Followup question: Does it make sense to have the TaskManager trying 
 indefinitely to connect to the JobManager. With increasing interval (from 
 seconds to minutes)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)