[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24653915 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging { log.error(t, Could not properly unregister job {} form the library cache., jobID) } } - - private def checkJavaVersion(): Unit = { -if (System.getProperty(java.version).substring(0, 3).toDouble 1.7) { - log.warning(Warning: Flink is running with Java 6. + -Java 6 is not maintained any more by Oracle or the OpenJDK community. + -Flink currently supports Java 6, but may not in future releases, + - due to the unavailability of bug fixes security patched.) -} - } } object JobManager { + import ExecutionMode._ + val LOG = LoggerFactory.getLogger(classOf[JobManager]) + val FAILURE_RETURN_CODE = 1 + val JOB_MANAGER_NAME = jobmanager val EVENT_COLLECTOR_NAME = eventcollector val ARCHIVE_NAME = archive val PROFILER_NAME = profiler def main(args: Array[String]): Unit = { + +// startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, JobManager) -val (configuration, executionMode, listeningAddress) = parseArgs(args) +checkJavaVersion() - if(SecurityUtils.isSecurityEnabled) { +val (configuration: Configuration, + executionMode: ExecutionMode, + listeningAddress: Option[(String, Int)]) = +try { + parseArgs(args) +} +catch { + case t: Throwable = { +LOG.error(t.getMessage(), t) +System.exit(FAILURE_RETURN_CODE) +null --- End diff -- We have to, to silence the compiler, since it does not know that there is no return from `System.exit()` and it is not possible to assign `Nothing` to the triplet value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24653930 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging { log.error(t, Could not properly unregister job {} form the library cache., jobID) } } - - private def checkJavaVersion(): Unit = { -if (System.getProperty(java.version).substring(0, 3).toDouble 1.7) { - log.warning(Warning: Flink is running with Java 6. + -Java 6 is not maintained any more by Oracle or the OpenJDK community. + -Flink currently supports Java 6, but may not in future releases, + - due to the unavailability of bug fixes security patched.) -} - } } object JobManager { + import ExecutionMode._ + val LOG = LoggerFactory.getLogger(classOf[JobManager]) + val FAILURE_RETURN_CODE = 1 + val JOB_MANAGER_NAME = jobmanager val EVENT_COLLECTOR_NAME = eventcollector val ARCHIVE_NAME = archive val PROFILER_NAME = profiler def main(args: Array[String]): Unit = { + +// startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, JobManager) -val (configuration, executionMode, listeningAddress) = parseArgs(args) +checkJavaVersion() - if(SecurityUtils.isSecurityEnabled) { +val (configuration: Configuration, + executionMode: ExecutionMode, + listeningAddress: Option[(String, Int)]) = +try { + parseArgs(args) +} +catch { + case t: Throwable = { +LOG.error(t.getMessage(), t) +System.exit(FAILURE_RETURN_CODE) +null + } +} + +try { + if (SecurityUtils.isSecurityEnabled) { LOG.info(Security is enabled. Starting secure JobManager.) SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] { override def run(): Unit = { -start(configuration, executionMode, listeningAddress) +runJobManager(configuration, executionMode, listeningAddress) } }) } else { -start(configuration, executionMode, listeningAddress) +runJobManager(configuration, executionMode, listeningAddress) + } +} +catch { + case t: Throwable = { +LOG.error(Failed to start JobManager., t) +System.exit(FAILURE_RETURN_CODE) } +} } - def start(configuration: Configuration, executionMode: ExecutionMode, -listeningAddress : Option[(String, Int)]): Unit = { -val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress) -startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem) + def runJobManager(configuration: Configuration, +executionMode: ExecutionMode, +listeningAddress: Option[(String, Int)]) : Unit = { + +LOG.info(Starting JobManager) +LOG.debug(Starting JobManager actor system) -if(executionMode.equals(LOCAL)){ - TaskManager.startActorWithConfiguration(, configuration, -localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) +val jobManagerSystem = try { + AkkaUtils.createActorSystem(configuration, listeningAddress) } +catch { + case t: Throwable = { +if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) { + val cause = t.getCause() + if (cause != null t.getCause().isInstanceOf[java.net.BindException]) { +val address = listeningAddress match { + case Some((host, port)) = host + : + port + case None = unknown +} -jobManagerSystem.awaitTermination() +throw new Exception(Unable to create JobManager at address + address + : + cause.getMessage(), t) + } +} +throw new Exception(Could not create JobManager actor system, t) + } +} + +try { + LOG.debug(Starting JobManager actor) + + startActor(configuration, jobManagerSystem) + + if(executionMode.equals(LOCAL)){ +LOG.info(Starting embedded TaskManager for JobManager's LOCAL mode execution) + +TaskManager.startActorWithConfiguration(, configuration, + localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) + } + + jobManagerSystem.awaitTermination() +} +
[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24657012 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging { log.error(t, Could not properly unregister job {} form the library cache., jobID) } } - - private def checkJavaVersion(): Unit = { -if (System.getProperty(java.version).substring(0, 3).toDouble 1.7) { - log.warning(Warning: Flink is running with Java 6. + -Java 6 is not maintained any more by Oracle or the OpenJDK community. + -Flink currently supports Java 6, but may not in future releases, + - due to the unavailability of bug fixes security patched.) -} - } } object JobManager { + import ExecutionMode._ + val LOG = LoggerFactory.getLogger(classOf[JobManager]) + val FAILURE_RETURN_CODE = 1 + val JOB_MANAGER_NAME = jobmanager val EVENT_COLLECTOR_NAME = eventcollector val ARCHIVE_NAME = archive val PROFILER_NAME = profiler def main(args: Array[String]): Unit = { + +// startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, JobManager) -val (configuration, executionMode, listeningAddress) = parseArgs(args) +checkJavaVersion() - if(SecurityUtils.isSecurityEnabled) { +val (configuration: Configuration, + executionMode: ExecutionMode, + listeningAddress: Option[(String, Int)]) = +try { + parseArgs(args) +} +catch { + case t: Throwable = { +LOG.error(t.getMessage(), t) +System.exit(FAILURE_RETURN_CODE) +null --- End diff -- I'll leave it as it is. Looks simpler to me ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/385#issuecomment-74236977 I think you have to add ```archive ! PoisonPill``` to the ```postStop``` method of the JobManager to shutdown the archive actor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24602575 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -653,11 +693,87 @@ object JobManager { (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) } - def startActor(configuration: Configuration)(implicit actorSystem: ActorSystem): ActorRef = { -startActor(Props(classOf[JobManager], configuration)) + /** + * Create the job manager members as (instanceManager, scheduler, libraryCacheManager, + * archiverProps, accumulatorManager, profiler, defaultExecutionRetries, + * delayBetweenRetries, timeout) + * + * @param configuration The configuration from which to parse the config values. + * @return The members for a default JobManager. + */ + def createJobManagerComponents(configuration: Configuration) : +(InstanceManager, FlinkScheduler, BlobLibraryCacheManager, + Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, Int) = { + +val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) + +val (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) = + parseConfiguration(configuration) + +val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount) + +val profilerProps: Option[Props] = if (profilingEnabled) { + Some(Props(classOf[JobManagerProfiler])) +} else { + None +} + +val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount)) + +var blobServer: BlobServer = null +var instanceManager: InstanceManager = null +var scheduler: FlinkScheduler = null +var libraryCacheManager: BlobLibraryCacheManager = null + +try { + blobServer = new BlobServer(configuration) + instanceManager = new InstanceManager() + scheduler = new FlinkScheduler() + libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) + + instanceManager.addInstanceListener(scheduler) +} +catch { + case t: Throwable = { +if (libraryCacheManager != null) { + libraryCacheManager.shutdown() +} +if (scheduler != null) { + scheduler.shutdown() +} +if (instanceManager != null) { + instanceManager.shutdown() +} +if (blobServer != null) { + blobServer.shutdown() +} +throw t; + } +} + +(instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, + profilerProps, executionRetries, delayBetweenRetries, timeout, archiveCount) + } + + def startActor(configuration: Configuration, actorSystem: ActorSystem): ActorRef = { + +val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, + profilerProps, executionRetries, delayBetweenRetries, + timeout, _) = createJobManagerComponents(configuration) + +val profiler: Option[ActorRef] = + profilerProps.map( props = actorSystem.actorOf(props, PROFILER_NAME) ) --- End diff -- Nice functional style :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24601897 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging { log.error(t, Could not properly unregister job {} form the library cache., jobID) } } - - private def checkJavaVersion(): Unit = { -if (System.getProperty(java.version).substring(0, 3).toDouble 1.7) { - log.warning(Warning: Flink is running with Java 6. + -Java 6 is not maintained any more by Oracle or the OpenJDK community. + -Flink currently supports Java 6, but may not in future releases, + - due to the unavailability of bug fixes security patched.) -} - } } object JobManager { + import ExecutionMode._ + val LOG = LoggerFactory.getLogger(classOf[JobManager]) + val FAILURE_RETURN_CODE = 1 + val JOB_MANAGER_NAME = jobmanager val EVENT_COLLECTOR_NAME = eventcollector val ARCHIVE_NAME = archive val PROFILER_NAME = profiler def main(args: Array[String]): Unit = { + +// startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, JobManager) -val (configuration, executionMode, listeningAddress) = parseArgs(args) +checkJavaVersion() - if(SecurityUtils.isSecurityEnabled) { +val (configuration: Configuration, + executionMode: ExecutionMode, + listeningAddress: Option[(String, Int)]) = +try { + parseArgs(args) +} +catch { + case t: Throwable = { +LOG.error(t.getMessage(), t) +System.exit(FAILURE_RETURN_CODE) +null --- End diff -- Do we need the ```null``` expression after the ```System.exit```? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1529] [jobmanager] Improve error handli...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24601755 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -149,7 +134,7 @@ Actor with ActorLogMessages with ActorLogging { override def receiveWithLogMessages: Receive = { case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) = - val taskManager = sender + val taskManager = sender() --- End diff -- This does not work with older Akka versions, which we are using with the Hadoop-2.0.0-alpha profile. I think in older Akka version it is a val. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---