eyalbenivri closed pull request #6: Amaterasu 15 URL: https://github.com/apache/incubator-amaterasu/pull/6
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/build.gradle b/common/build.gradle index c63c5a3..0b6cf8a 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -43,6 +43,7 @@ dependencies { compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0' compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9' compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.9' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4' // currently we have to use this specific mesos version to prevent from // clashing with spark diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala index 6f14c2d..8a44019 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala @@ -16,8 +16,9 @@ */ package org.apache.amaterasu.common.execution.actions -import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel -import org.apache.amaterasu.common.execution.actions.NotificationType.NotificationType +import NotificationLevel.NotificationLevel +import NotificationType.NotificationType +import com.fasterxml.jackson.annotation.JsonProperty abstract class Notifier { @@ -33,22 +34,22 @@ abstract class Notifier { object NotificationType extends Enumeration { type NotificationType = Value - val success = Value("success") - val error = Value("error") - val info = Value("info") + val success: NotificationType.Value = Value("success") + val error: NotificationType.Value = Value("error") + val info: NotificationType.Value = Value("info") } object NotificationLevel extends Enumeration { type NotificationLevel = Value - val execution = Value("execution") - val code = Value("code") - val none = Value("none") + val execution: NotificationLevel.Value = Value("execution") + val code: NotificationLevel.Value = Value("code") + val none: NotificationLevel.Value = Value("none") } -case class Notification(line: String, - msg: String, - notType: NotificationType, - notLevel: NotificationLevel) +case class Notification(@JsonProperty("line") line: String, + @JsonProperty("msg") msg: String, + @JsonProperty("notType") notType: NotificationType, + @JsonProperty("notLevel") notLevel: NotificationLevel) diff --git a/executor/build.gradle b/executor/build.gradle index a081759..30076d4 100644 --- a/executor/build.gradle +++ b/executor/build.gradle @@ -65,6 +65,8 @@ dependencies { compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5' compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5' compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5' + compile group: 'org.apache.activemq', name: 'activemq-client', version: '5.15.2' + compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0' compile project(':common') compile project(':amaterasu-sdk') diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala new file mode 100644 index 0000000..2f73b71 --- /dev/null +++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala @@ -0,0 +1,72 @@ +package org.apache.amaterasu.executor.common.executors + +import javax.jms.{DeliveryMode, MessageProducer, Session} + + +import net.liftweb.json._ +import net.liftweb.json.Serialization.write +import org.apache.activemq.ActiveMQConnectionFactory +import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType, Notifier} +import org.apache.amaterasu.common.logging.Logging + +class ActiveNotifier extends Notifier with Logging { + + var producer: MessageProducer = _ + var session: Session = _ + + implicit val formats = DefaultFormats + + override def info(message: String): Unit = { + + log.info(message) + + val notification = Notification("", message, NotificationType.info, NotificationLevel.execution) + val notificationJson = write(notification) + val msg = session.createTextMessage(notificationJson) + producer.send(msg) + + } + + override def success(line: String): Unit = { + + log.info(s"successfully executed line: $line") + + val notification = Notification(line, "", NotificationType.success, NotificationLevel.code) + val notificationJson = write(notification) + val msg = session.createTextMessage(notificationJson) + producer.send(msg) + + } + + override def error(line: String, message: String): Unit = { + + log.error(s"Error executing line: $line message: $message") + + val notification = Notification(line, message, NotificationType.error, NotificationLevel.code) + val notificationJson = write(notification) + val msg = session.createTextMessage(notificationJson) + producer.send(msg) + + } +} + +object ActiveNotifier extends Logging { + def apply(address: String): ActiveNotifier = { + + // setting up activeMQ connection + val connectionFactory = new ActiveMQConnectionFactory(address) + val connection = connectionFactory.createConnection() + connection.start() + val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val destination = session.createTopic("JOB.REPORT") + val producer = session.createProducer(destination) + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) + + // creating notifier + val notifier = new ActiveNotifier + notifier.session = session + notifier.producer = producer + + notifier + } +} \ No newline at end of file diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala index e99acfe..a091c1b 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala @@ -28,33 +28,33 @@ class MesosNotifier(driver: ExecutorDriver) extends Notifier with Logging { private val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) - override def success(line: String) = { + override def success(line: String): Unit = { log.info(s"successfully executed line: $line") - val notification = new Notification(line, "", NotificationType.success, NotificationLevel.code) + val notification = Notification(line, "", NotificationType.success, NotificationLevel.code) val msg = mapper.writeValueAsBytes(notification) driver.sendFrameworkMessage(msg) } - override def error(line: String, message: String) = { + override def error(line: String, message: String): Unit = { log.error(s"Error executing line: $line message: $message") - val notification = new Notification(line, message, NotificationType.error, NotificationLevel.code) + val notification = Notification(line, message, NotificationType.error, NotificationLevel.code) val msg = mapper.writeValueAsBytes(notification) driver.sendFrameworkMessage(msg) } - override def info(message: String) = { + override def info(message: String): Unit = { log.info(message) - val notification = new Notification("", message, NotificationType.info, NotificationLevel.execution) + val notification = Notification("", message, NotificationType.info, NotificationLevel.execution) val msg = mapper.writeValueAsBytes(notification) driver.sendFrameworkMessage(msg) diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala index 05637cb..d437778 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala @@ -8,7 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData} import org.apache.amaterasu.common.logging.Logging -import org.apache.amaterasu.executor.common.executors.ProvidersFactory +import org.apache.amaterasu.executor.common.executors.{ActiveNotifier, ProvidersFactory} import org.apache.hadoop.net.NetUtils import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.SparkContext @@ -53,12 +53,6 @@ class ActionsExecutor extends Logging { //s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(gson.toJson(taskData), "UTF-8")}' '${URLEncoder.encode(gson.toJson(execData), "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}'" object ActionsExecutorLauncher extends App with Logging { - def urlses(cl: ClassLoader): Array[java.net.URL] = cl match { - case null => Array() - case u: java.net.URLClassLoader => u.getURLs() ++ urlses(cl.getParent) - case _ => urlses(cl.getParent) - } - val hostName = InetAddress.getLocalHost.getHostName log.info(s"Hostname resolved to: $hostName") @@ -67,14 +61,11 @@ object ActionsExecutorLauncher extends App with Logging { log.info("Starting actions executor") - val urls = urlses(getClass.getClassLoader) - - log.info("Current classpath is:") - log.info(urls.mkString("\n")) - val jobId = this.args(0) val master = this.args(1) val actionName = this.args(2) + val notificationsAddress = this.args(6) + log.info("parsing task data") val taskData = mapper.readValue(URLDecoder.decode(this.args(3), "UTF-8"), classOf[TaskData]) log.info("parsing executor data") @@ -89,7 +80,7 @@ object ActionsExecutorLauncher extends App with Logging { log.info("Setup executor") val baos = new ByteArrayOutputStream() - val notifier = new YarnNotifier(new YarnConfiguration()) + val notifier = ActiveNotifier(notificationsAddress) log.info("Setup notifier") actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties") diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala index 1fb2d85..9261080 100755 --- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala @@ -57,7 +57,7 @@ class SparkScalaRunner(var env: Environment, def interpretSources(source: Source, actionName: String, exports: Map[String, String]): Unit = { notifier.info(s"================= started action $actionName =================") - notifier.info(s"exports is: $exports") + //notifier.info(s"exports is: $exports") for (line <- source.getLines()) { @@ -86,8 +86,8 @@ class SparkScalaRunner(var env: Environment, val resultName = interpreter.prevRequestList.last.termNames.last - notifier.info(s" result name ${resultName.toString}") - notifier.info(s" exist in exports: ${exports.contains(resultName.toString)}") + //notifier.info(s" result name ${resultName.toString}") + //notifier.info(s" exist in exports: ${exports.contains(resultName.toString)}") if (exports.contains(resultName.toString)) { @@ -99,7 +99,7 @@ class SparkScalaRunner(var env: Environment, case ds: Dataset[_] => log.debug(s"persisting DataFrame: $resultName") val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")""" - notifier.info(writeLine) + //notifier.info(writeLine) val writeResult = interpreter.interpret(writeLine) if (writeResult != Results.Success) { val err = outStream.toString diff --git a/leader/build.gradle b/leader/build.gradle index 9ecd17c..429f072 100644 --- a/leader/build.gradle +++ b/leader/build.gradle @@ -41,12 +41,12 @@ dependencies { compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0' compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0' - compile group: 'org.apache.curator', name: 'curator-framework', version: '2.9.1' + compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1' compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4' - compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.3' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.3' + compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.3' + compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.3' compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908' @@ -61,6 +61,9 @@ dependencies { compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6' compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2' compile group: 'org.reflections', name: 'reflections', version: '0.9.11' + compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3' + compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0' + runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3' testCompile project(':common') testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14" @@ -68,7 +71,9 @@ dependencies { testCompile 'junit:junit:4.11' testCompile 'org.scalatest:scalatest_2.11:3.0.2' testCompile 'org.scala-lang:scala-library:2.11.8' - testCompile group: 'org.apache.curator', name: 'curator-test', version: '2.9.1' + testCompile( 'org.apache.curator:curator-test:2.9.1'){ + exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' + } } diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java index e085d6e..5a8665c 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java @@ -16,17 +16,18 @@ */ package org.apache.amaterasu.leader.yarn; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.amaterasu.common.configuration.ClusterConfig; import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory; +import org.apache.amaterasu.leader.utilities.ActiveReportListener; import org.apache.amaterasu.sdk.FrameworkSetupProvider; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -37,14 +38,23 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.barriers.DistributedBarrier; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.*; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.*; +import static java.lang.System.exit; + public class Client { private final static Logger LOGGER = LoggerFactory.getLogger(Client.class); @@ -63,17 +73,12 @@ private LocalResource setLocalResourceFromPath(Path path) throws IOException { return fileResource; } - public void run(JobOpts opts, String[] args) throws Exception { + private void run(JobOpts opts, String[] args) throws Exception { + LogManager.resetConfiguration(); ClusterConfig config = new ClusterConfig(); config.load(new FileInputStream(opts.home + "/amaterasu.properties")); -// conf.addResource(new Path("/etc/hadoop/conf/core-site.xml")); -// conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml")); -// conf.addResource(new Path("/etc/hadoop/conf/yarn-site.xml")); -// conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName()); -// conf.set("fs.file.impl", LocalFileSystem.class.getName()); - // Create yarnClient YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); @@ -85,10 +90,10 @@ public void run(JobOpts opts, String[] args) throws Exception { app = yarnClient.createApplication(); } catch (YarnException e) { LOGGER.error("Error initializing yarn application with yarn client.", e); - System.exit(1); + exit(1); } catch (IOException e) { LOGGER.error("Error initializing yarn application with yarn client.", e); - System.exit(2); + exit(2); } // Setup jars on hdfs @@ -96,7 +101,7 @@ public void run(JobOpts opts, String[] args) throws Exception { fs = FileSystem.get(conf); } catch (IOException e) { LOGGER.error("Eror creating HDFS client isntance.", e); - System.exit(3); + exit(3); } Path jarPath = new Path(config.YARN().hdfsJarsPath()); Path jarPathQualified = fs.makeQualified(jarPath); @@ -108,6 +113,7 @@ public void run(JobOpts opts, String[] args) throws Exception { newId = "--new-job-id " + appContext.getApplicationId().toString() + "-" + UUID.randomUUID().toString(); } + List<String> commands = Collections.singletonList( "env AMA_NODE=" + System.getenv("AMA_NODE") + " " + "$JAVA_HOME/bin/java" + @@ -116,8 +122,8 @@ public void run(JobOpts opts, String[] args) throws Exception { " org.apache.amaterasu.leader.yarn.ApplicationMaster " + joinStrings(args) + newId + - "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " + - "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" ); @@ -130,6 +136,7 @@ public void run(JobOpts opts, String[] args) throws Exception { if (!fs.exists(jarPathQualified)) { File home = new File(opts.home); fs.mkdirs(jarPathQualified); + for (File f : home.listFiles()) { fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), jarPathQualified); } @@ -152,7 +159,10 @@ public void run(JobOpts opts, String[] args) throws Exception { } } catch (IOException e) { LOGGER.error("Error uploading ama folder to HDFS.", e); - System.exit(3); + exit(3); + } catch (NullPointerException ne) { + LOGGER.error("No files in home dir.", ne); + exit(4); } @@ -177,7 +187,7 @@ public void run(JobOpts opts, String[] args) throws Exception { log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties"))); } catch (IOException e) { LOGGER.error("Error initializing yarn local resources.", e); - System.exit(4); + exit(4); } // set local resource on master container @@ -210,14 +220,29 @@ public void run(JobOpts opts, String[] args) throws Exception { LOGGER.info("Submitting application {}", appId); try { yarnClient.submitApplication(appContext); + } catch (YarnException e) { LOGGER.error("Error submitting application.", e); - System.exit(6); + exit(6); } catch (IOException e) { LOGGER.error("Error submitting application.", e); - System.exit(7); + exit(7); } + CuratorFramework client = CuratorFrameworkFactory.newClient(config.zk(), + new ExponentialBackoffRetry(1000, 3)); + client.start(); + + String newJobId = newId.replace("--new-job-id ", ""); + System.out.println("===> /" + newJobId + "-report-barrier"); + DistributedBarrier reportBarrier = new DistributedBarrier(client, "/" + newJobId + "-report-barrier"); + reportBarrier.setBarrier(); + reportBarrier.waitOnBarrier(); + + String address = new String( client.getData().forPath("/" + newJobId + "/broker")); + System.out.println("===> " + address); + setupReportListener(address); + ApplicationReport appReport = null; YarnApplicationState appState; @@ -226,13 +251,14 @@ public void run(JobOpts opts, String[] args) throws Exception { appReport = yarnClient.getApplicationReport(appId); } catch (YarnException e) { LOGGER.error("Error getting application report.", e); - System.exit(8); + exit(8); } catch (IOException e) { LOGGER.error("Error getting application report.", e); - System.exit(9); + exit(9); } appState = appReport.getYarnApplicationState(); if (isAppFinished(appState)) { + exit(0); break; } //LOGGER.info("Application not finished ({})", appReport.getProgress()); @@ -240,17 +266,13 @@ public void run(JobOpts opts, String[] args) throws Exception { Thread.sleep(100); } catch (InterruptedException e) { LOGGER.error("Interrupted while waiting for job completion.", e); - System.exit(137); + exit(137); } } while (!isAppFinished(appState)); LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport.getFinalApplicationStatus(), appReport.getFinishTime()); } - private static void copyDirRecursive(){ - - } - private boolean isAppFinished(YarnApplicationState appState) { return appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.KILLED || @@ -276,16 +298,32 @@ private static String joinStrings(String[] str) { } + private void setupReportListener(String address) throws JMSException { + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(address); + Connection conn = cf.createConnection(); + conn.start(); + + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //TODO: move to a const in common + Topic destination = session.createTopic("JOB.REPORT"); + + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new ActiveReportListener()); + + } + private void setupAppMasterEnv(Map<String, String> appMasterEnv) { Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(), - ApplicationConstants.Environment.PWD.$() + File.separator + "*"); + ApplicationConstants.Environment.PWD.$() + File.separator + "*", File.pathSeparator); for (String c : conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(), - c.trim()); + c.trim(), File.pathSeparator); } } } \ No newline at end of file diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala index 4f61cec..f540997 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala @@ -27,8 +27,8 @@ trait Action extends Logging { var actionPath: String = _ var actionId: String = _ - var data: ActionData = null - var client: CuratorFramework = null + var data: ActionData = _ + var client: CuratorFramework = _ def execute(): Unit diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala new file mode 100644 index 0000000..e24d979 --- /dev/null +++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/ActiveReportListener.scala @@ -0,0 +1,56 @@ +package org.apache.amaterasu.leader.utilities + +import javax.jms.{Message, MessageListener, TextMessage} + +import net.liftweb.json._ +import net.liftweb.json.JsonDSL._ +import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType} + +class ActiveReportListener extends MessageListener { + + implicit val formats = DefaultFormats + + override def onMessage(message: Message): Unit = { + message match { + case tm: TextMessage => + try { + val notification = parseNot(parse(tm.getText)) + printNotification(notification) + + } catch { + case e: Exception => println(e.getMessage) + } + case _ => println("===> Unknown message") + } + } + + private def parseNot(json: JValue): Notification = Notification( + (json \ "line").asInstanceOf[JString].values, + (json \ "msg").asInstanceOf[JString].values, + NotificationType.withName((json \ "notType" \ "name").asInstanceOf[JString].values), + NotificationLevel.withName((json \ "notLevel" \ "name").asInstanceOf[JString].values) + ) + + + private def printNotification(notification: Notification): Unit = { + + var color = Console.WHITE + + notification.notType match { + + case NotificationType.info => + color = Console.WHITE + println(s"$color${Console.BOLD}===> ${notification.msg} ${Console.RESET}") + case NotificationType.success => + color = Console.GREEN + println(s"$color${Console.BOLD}===> ${notification.line} ${Console.RESET}") + case NotificationType.error => + color = Console.RED + println(s"$color${Console.BOLD}===> ${notification.line} ${Console.RESET}") + println(s"$color${Console.BOLD}===> ${notification.msg} ${Console.RESET}") + + } + + } +} + diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala index 9844d07..65efecc 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala @@ -17,21 +17,24 @@ package org.apache.amaterasu.leader.yarn import java.io.{File, FileInputStream, InputStream} -import java.net.URLEncoder +import java.net.{InetAddress, ServerSocket, URLEncoder} import java.nio.ByteBuffer import java.util import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} +import javax.jms.Session -import com.google.gson.Gson +import org.apache.activemq.broker.BrokerService +import org.apache.activemq.ActiveMQConnectionFactory import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory import org.apache.amaterasu.leader.execution.{JobLoader, JobManager} -import org.apache.amaterasu.leader.utilities.{Args, DataLoader} +import org.apache.amaterasu.leader.utilities.{ActiveReportListener, Args, DataLoader} +import org.apache.curator.framework.recipes.barriers.DistributedBarrier import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.retry.ExponentialBackoffRetry -import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants @@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.security.AMRMTokenIdentifier import org.apache.hadoop.yarn.util.{ConverterUtils, Records} +import org.apache.zookeeper.CreateMode import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -54,7 +58,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { var capability: Resource = _ - private val MAX_ATTEMPTS_PER_TASK = 3 log.info("ApplicationMaster start") private var jobManager: JobManager = _ @@ -67,7 +70,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { private var propPath: String = "" private var props: InputStream = _ private var jarPath: Path = _ - private var version: String = "" private var executorPath: Path = _ private var executorJar: LocalResource = _ private var propFile: LocalResource = _ @@ -75,24 +77,30 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { private var nmClient: NMClientAsync = _ private var allocListener: YarnRMCallbackHandler = _ private var rmClient: AMRMClientAsync[ContainerRequest] = _ + private var address: String = _ private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]() - private val gson: Gson = new Gson() + private val host: String = InetAddress.getLocalHost.getHostName + private val broker: BrokerService = new BrokerService() def setLocalResourceFromPath(path: Path): LocalResource = { + val stat = fs.getFileStatus(path) val fileResource = Records.newRecord(classOf[LocalResource]) + fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path)) fileResource.setSize(stat.getLen) fileResource.setTimestamp(stat.getModificationTime) fileResource.setType(LocalResourceType.FILE) fileResource.setVisibility(LocalResourceVisibility.PUBLIC) fileResource + } def execute(arguments: Args): Unit = { + log.info(s"started AM with args $arguments") propPath = System.getenv("PWD") + "/amaterasu.properties" @@ -105,13 +113,24 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { config = ClusterConfig(props) - try { initJob(arguments) } catch { - case e: Exception => log.error("error initielzing ", e.getMessage) + case e: Exception => log.error("error initializing ", e.getMessage) } + // now that the job was initiated, the curator client is started and we can + // register the broker's address + client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.jobId}/broker") + client.setData().forPath(s"/${jobManager.jobId}/broker", address.getBytes) + + // once the broker is registered, we can remove the barrier so clients can connect + log.info(s"/${jobManager.jobId}-report-barrier") + val barrier = new DistributedBarrier(client, s"/${jobManager.jobId}-report-barrier") + barrier.removeBarrier() + + setupMessaging(jobManager.jobId) + log.info(s"Job ${jobManager.jobId} initiated with ${jobManager.registeredActions.size} actions") jarPath = new Path(config.YARN.hdfsJarsPath) @@ -155,6 +174,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { this.capability.setMemory(Math.min(config.taskMem, 1024)) this.capability.setVirtualCores(1) + while (!jobManager.outOfActions) { val actionData = jobManager.getNextActionData if (actionData != null) { @@ -165,6 +185,22 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { log.info("Finished asking for containers") } + private def setupMessaging(jobId: String): Unit = { + + val cf = new ActiveMQConnectionFactory(address) + val conn = cf.createConnection() + conn.start() + + val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE) + //TODO: move to a const in common + val destination = session.createTopic("JOB.REPORT") + + val consumer = session.createConsumer(destination) + consumer.setMessageListener(new ActiveReportListener) + + } + + private def askContainer(actionData: ActionData): Unit = { actionsBuffer.add(actionData) @@ -217,7 +253,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { "-Dscala.usejavacp=true " + "-Dhdp.version=2.6.1.0-129 " + "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " + - s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(taskData, "UTF-8")}' '${URLEncoder.encode(execData, "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}' " + + s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(taskData, "UTF-8")}' '${URLEncoder.encode(execData, "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}' '$address' " + s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " + s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr " ) @@ -228,7 +264,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { ctx.setCommands(commands) ctx.setTokens(allTokens) - var resources = mutable.Map[String, LocalResource]( + val resources = mutable.Map[String, LocalResource]( "executor.jar" -> executorJar, "amaterasu.properties" -> propFile, // TODO: Nadav/Eyal all of these should move to the executor resource setup @@ -355,6 +391,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { } def initJob(args: Args): Unit = { + this.env = args.env this.branch = args.branch try { @@ -398,7 +435,8 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { } } -object ApplicationMaster extends App { +object ApplicationMaster extends App with Logging { + val parser = Args.getParser parser.parse(args, Args()) match { @@ -406,9 +444,20 @@ object ApplicationMaster extends App { case Some(arguments: Args) => val appMaster = new ApplicationMaster() + appMaster.address = s"tcp://${appMaster.host}:$generatePort" + appMaster.broker.addConnector(appMaster.address) + appMaster.broker.start() + + log.info(s"broker started with address ${appMaster.address}") appMaster.execute(arguments) case None => } + private def generatePort: Int = { + val socket = new ServerSocket(0) + val port = socket.getLocalPort + socket.close() + port + } } diff --git a/leader/src/main/scripts/log4j.properties b/leader/src/main/scripts/log4j.properties index 6b40036..c5e965f 100644 --- a/leader/src/main/scripts/log4j.properties +++ b/leader/src/main/scripts/log4j.properties @@ -7,4 +7,4 @@ log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n -log4j.logger.org.reflections=OFF \ No newline at end of file +log4j.logger.reflections.Reflections=OFF \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services