[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

2015-02-13 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/385#discussion_r24653915
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
 log.error(t, Could not properly unregister job {} form the 
library cache., jobID)
 }
   }
-
-  private def checkJavaVersion(): Unit = {
-if (System.getProperty(java.version).substring(0, 3).toDouble  1.7) 
{
-  log.warning(Warning: Flink is running with Java 6.  +
-Java 6 is not maintained any more by Oracle or the OpenJDK 
community.  +
-Flink currently supports Java 6, but may not in future releases, 
+
- due to the unavailability of bug fixes security patched.)
-}
-  }
 }
 
 object JobManager {
+  
   import ExecutionMode._
+
   val LOG = LoggerFactory.getLogger(classOf[JobManager])
+
   val FAILURE_RETURN_CODE = 1
+
   val JOB_MANAGER_NAME = jobmanager
   val EVENT_COLLECTOR_NAME = eventcollector
   val ARCHIVE_NAME = archive
   val PROFILER_NAME = profiler
 
   def main(args: Array[String]): Unit = {
+
+// startup checks and logging
 EnvironmentInformation.logEnvironmentInfo(LOG, JobManager)
-val (configuration, executionMode, listeningAddress) = parseArgs(args)
+checkJavaVersion()
 
-  if(SecurityUtils.isSecurityEnabled) {
+val (configuration: Configuration,
+ executionMode: ExecutionMode,
+ listeningAddress:  Option[(String, Int)]) =
+try {
+  parseArgs(args)
+}
+catch {
+  case t: Throwable = {
+LOG.error(t.getMessage(), t)
+System.exit(FAILURE_RETURN_CODE)
+null
--- End diff --

We have to, to silence the compiler, since it does not know that there is 
no return from `System.exit()` and it is not possible to assign `Nothing` to 
the triplet value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

2015-02-13 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/385#discussion_r24653930
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
 log.error(t, Could not properly unregister job {} form the 
library cache., jobID)
 }
   }
-
-  private def checkJavaVersion(): Unit = {
-if (System.getProperty(java.version).substring(0, 3).toDouble  1.7) 
{
-  log.warning(Warning: Flink is running with Java 6.  +
-Java 6 is not maintained any more by Oracle or the OpenJDK 
community.  +
-Flink currently supports Java 6, but may not in future releases, 
+
- due to the unavailability of bug fixes security patched.)
-}
-  }
 }
 
 object JobManager {
+  
   import ExecutionMode._
+
   val LOG = LoggerFactory.getLogger(classOf[JobManager])
+
   val FAILURE_RETURN_CODE = 1
+
   val JOB_MANAGER_NAME = jobmanager
   val EVENT_COLLECTOR_NAME = eventcollector
   val ARCHIVE_NAME = archive
   val PROFILER_NAME = profiler
 
   def main(args: Array[String]): Unit = {
+
+// startup checks and logging
 EnvironmentInformation.logEnvironmentInfo(LOG, JobManager)
-val (configuration, executionMode, listeningAddress) = parseArgs(args)
+checkJavaVersion()
 
-  if(SecurityUtils.isSecurityEnabled) {
+val (configuration: Configuration,
+ executionMode: ExecutionMode,
+ listeningAddress:  Option[(String, Int)]) =
+try {
+  parseArgs(args)
+}
+catch {
+  case t: Throwable = {
+LOG.error(t.getMessage(), t)
+System.exit(FAILURE_RETURN_CODE)
+null
+  }
+}
+
+try {
+  if (SecurityUtils.isSecurityEnabled) {
 LOG.info(Security is enabled. Starting secure JobManager.)
 SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
   override def run(): Unit = {
-start(configuration, executionMode, listeningAddress)
+runJobManager(configuration, executionMode, listeningAddress)
   }
 })
   } else {
-start(configuration, executionMode, listeningAddress)
+runJobManager(configuration, executionMode, listeningAddress)
+  }
+}
+catch {
+  case t: Throwable = {
+LOG.error(Failed to start JobManager., t)
+System.exit(FAILURE_RETURN_CODE)
   }
+}
   }
 
-  def start(configuration: Configuration, executionMode: ExecutionMode,
-listeningAddress : Option[(String, Int)]): Unit = {
-val jobManagerSystem = AkkaUtils.createActorSystem(configuration, 
listeningAddress)
 
-startActor(Props(new JobManager(configuration) with 
WithWebServer))(jobManagerSystem)
+  def runJobManager(configuration: Configuration,
+executionMode: ExecutionMode,
+listeningAddress: Option[(String, Int)]) : Unit = {
+
+LOG.info(Starting JobManager)
+LOG.debug(Starting JobManager actor system)
 
-if(executionMode.equals(LOCAL)){
-  TaskManager.startActorWithConfiguration(, configuration,
-localAkkaCommunication = false, localTaskManagerCommunication = 
true)(jobManagerSystem)
+val jobManagerSystem = try {
+  AkkaUtils.createActorSystem(configuration, listeningAddress)
 }
+catch {
+  case t: Throwable = {
+if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
+  val cause = t.getCause()
+  if (cause != null  
t.getCause().isInstanceOf[java.net.BindException]) {
+val address = listeningAddress match {
+  case Some((host, port)) = host + : + port
+  case None = unknown
+}
 
-jobManagerSystem.awaitTermination()
+throw new Exception(Unable to create JobManager at address  
+ address + :  + cause.getMessage(), t)
+  }
+}
+throw new Exception(Could not create JobManager actor system, t)
+  }
+}
+
+try {
+  LOG.debug(Starting JobManager actor)
+
+  startActor(configuration, jobManagerSystem)
+
+  if(executionMode.equals(LOCAL)){
+LOG.info(Starting embedded TaskManager for JobManager's LOCAL 
mode execution)
+
+TaskManager.startActorWithConfiguration(, configuration,
+  localAkkaCommunication = false, localTaskManagerCommunication = 
true)(jobManagerSystem)
+  }
+
+  jobManagerSystem.awaitTermination()
+}
+

[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

2015-02-13 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/385#discussion_r24657012
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
 log.error(t, Could not properly unregister job {} form the 
library cache., jobID)
 }
   }
-
-  private def checkJavaVersion(): Unit = {
-if (System.getProperty(java.version).substring(0, 3).toDouble  1.7) 
{
-  log.warning(Warning: Flink is running with Java 6.  +
-Java 6 is not maintained any more by Oracle or the OpenJDK 
community.  +
-Flink currently supports Java 6, but may not in future releases, 
+
- due to the unavailability of bug fixes security patched.)
-}
-  }
 }
 
 object JobManager {
+  
   import ExecutionMode._
+
   val LOG = LoggerFactory.getLogger(classOf[JobManager])
+
   val FAILURE_RETURN_CODE = 1
+
   val JOB_MANAGER_NAME = jobmanager
   val EVENT_COLLECTOR_NAME = eventcollector
   val ARCHIVE_NAME = archive
   val PROFILER_NAME = profiler
 
   def main(args: Array[String]): Unit = {
+
+// startup checks and logging
 EnvironmentInformation.logEnvironmentInfo(LOG, JobManager)
-val (configuration, executionMode, listeningAddress) = parseArgs(args)
+checkJavaVersion()
 
-  if(SecurityUtils.isSecurityEnabled) {
+val (configuration: Configuration,
+ executionMode: ExecutionMode,
+ listeningAddress:  Option[(String, Int)]) =
+try {
+  parseArgs(args)
+}
+catch {
+  case t: Throwable = {
+LOG.error(t.getMessage(), t)
+System.exit(FAILURE_RETURN_CODE)
+null
--- End diff --

I'll leave it as it is. Looks simpler to me ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

2015-02-13 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/385#issuecomment-74236977
  
I think you have to add ```archive ! PoisonPill``` to the ```postStop``` 
method of the JobManager to shutdown the archive actor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

2015-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/385#discussion_r24602575
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -653,11 +693,87 @@ object JobManager {
 (archiveCount, profilingEnabled, cleanupInterval, executionRetries, 
delayBetweenRetries)
   }
 
-  def startActor(configuration: Configuration)(implicit actorSystem: 
ActorSystem): ActorRef = {
-startActor(Props(classOf[JobManager], configuration))
+  /**
+   * Create the job manager members as (instanceManager, scheduler, 
libraryCacheManager,
+   *  archiverProps, accumulatorManager, profiler, 
defaultExecutionRetries,
+   *  delayBetweenRetries, timeout)
+   *
+   * @param configuration The configuration from which to parse the config 
values.
+   * @return The members for a default JobManager.
+   */
+  def createJobManagerComponents(configuration: Configuration) :
+(InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
+  Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, 
Int) = {
+
+val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
+
+val (archiveCount, profilingEnabled, cleanupInterval, 
executionRetries, delayBetweenRetries) =
+  parseConfiguration(configuration)
+
+val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
+
+val profilerProps: Option[Props] = if (profilingEnabled) {
+  Some(Props(classOf[JobManagerProfiler]))
+} else {
+  None
+}
+
+val accumulatorManager: AccumulatorManager = new 
AccumulatorManager(Math.min(1, archiveCount))
+
+var blobServer: BlobServer = null
+var instanceManager: InstanceManager = null
+var scheduler: FlinkScheduler = null
+var libraryCacheManager: BlobLibraryCacheManager = null
+
+try {
+  blobServer = new BlobServer(configuration)
+  instanceManager = new InstanceManager()
+  scheduler = new FlinkScheduler()
+  libraryCacheManager = new BlobLibraryCacheManager(blobServer, 
cleanupInterval)
+
+  instanceManager.addInstanceListener(scheduler)
+}
+catch {
+  case t: Throwable = {
+if (libraryCacheManager != null) {
+  libraryCacheManager.shutdown()
+}
+if (scheduler != null) {
+  scheduler.shutdown()
+}
+if (instanceManager != null) {
+  instanceManager.shutdown()
+}
+if (blobServer != null) {
+  blobServer.shutdown()
+}
+throw t;
+  }
+}
+
+(instanceManager, scheduler, libraryCacheManager, archiveProps, 
accumulatorManager,
+  profilerProps, executionRetries, delayBetweenRetries, timeout, 
archiveCount)
+  }
+
+  def startActor(configuration: Configuration, actorSystem: ActorSystem): 
ActorRef = {
+
+val (instanceManager, scheduler, libraryCacheManager, archiveProps, 
accumulatorManager,
+  profilerProps, executionRetries, delayBetweenRetries,
+  timeout, _) = createJobManagerComponents(configuration)
+
+val profiler: Option[ActorRef] =
+ profilerProps.map( props = actorSystem.actorOf(props, 
PROFILER_NAME) )
--- End diff --

Nice functional style :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

2015-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/385#discussion_r24601897
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
 log.error(t, Could not properly unregister job {} form the 
library cache., jobID)
 }
   }
-
-  private def checkJavaVersion(): Unit = {
-if (System.getProperty(java.version).substring(0, 3).toDouble  1.7) 
{
-  log.warning(Warning: Flink is running with Java 6.  +
-Java 6 is not maintained any more by Oracle or the OpenJDK 
community.  +
-Flink currently supports Java 6, but may not in future releases, 
+
- due to the unavailability of bug fixes security patched.)
-}
-  }
 }
 
 object JobManager {
+  
   import ExecutionMode._
+
   val LOG = LoggerFactory.getLogger(classOf[JobManager])
+
   val FAILURE_RETURN_CODE = 1
+
   val JOB_MANAGER_NAME = jobmanager
   val EVENT_COLLECTOR_NAME = eventcollector
   val ARCHIVE_NAME = archive
   val PROFILER_NAME = profiler
 
   def main(args: Array[String]): Unit = {
+
+// startup checks and logging
 EnvironmentInformation.logEnvironmentInfo(LOG, JobManager)
-val (configuration, executionMode, listeningAddress) = parseArgs(args)
+checkJavaVersion()
 
-  if(SecurityUtils.isSecurityEnabled) {
+val (configuration: Configuration,
+ executionMode: ExecutionMode,
+ listeningAddress:  Option[(String, Int)]) =
+try {
+  parseArgs(args)
+}
+catch {
+  case t: Throwable = {
+LOG.error(t.getMessage(), t)
+System.exit(FAILURE_RETURN_CODE)
+null
--- End diff --

Do we need the ```null``` expression after the ```System.exit```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...

2015-02-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/385#discussion_r24601755
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -149,7 +134,7 @@ Actor with ActorLogMessages with ActorLogging {
 
   override def receiveWithLogMessages: Receive = {
 case RegisterTaskManager(connectionInfo, hardwareInformation, 
numberOfSlots) =
-  val taskManager = sender
+  val taskManager = sender()
--- End diff --

This does not work with older Akka versions, which we are using with the 
Hadoop-2.0.0-alpha profile. I think in older Akka version it is a val.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---