Ottomata has submitted this change and it was merged.

Change subject: Update camus-partition-checker
......................................................................


Update camus-partition-checker

Correct Configuration to use hdfs conf.
Add parameters for hdfs conf files.
Add parameter to choose which date to check in addition to last run.
Add default logging to console.
Add dryRun mode only printing information.

Change-Id: I3e4ada59211d1d316c4be8182d68bb3ca7d4bc95
---
M 
refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
M 
refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
M 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
M 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
4 files changed, 90 insertions(+), 26 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git 
a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
 
b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
index c65236e..333d46d 100644
--- 
a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
+++ 
b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
@@ -1,16 +1,12 @@
 package org.wikimedia.analytics.refinery.camus
 
 import com.linkedin.camus.etl.kafka.common.EtlKey
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.io.{NullWritable, SequenceFile, Writable, 
WritableComparable}
 
 
-class CamusStatusReader() {
 
-  // Only using HDFS, so no need for proper hadoop config
-  val config: Configuration = new Configuration
-  val fs: FileSystem = FileSystem.get(config)
+class CamusStatusReader(fs: FileSystem) {
 
   /**
    * Reads EtlKeys from a sequence file
@@ -18,7 +14,7 @@
    * @return the read EtlKey sequence
    */
   def readEtlKeys(path: Path): Seq[EtlKey] = {
-    val reader = new SequenceFile.Reader(fs, path, config)
+    val reader = new SequenceFile.Reader(fs, path, fs.getConf)
     val key: WritableComparable[_] = 
reader.getKeyClass.newInstance.asInstanceOf[WritableComparable[_]]
     val value: Writable = NullWritable.get
     // Would have liked to be fully functionnal ...
diff --git 
a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
 
b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
index 9b5141f..acafdd4 100644
--- 
a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
+++ 
b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
@@ -13,7 +13,7 @@
   val mostRecentRunFolder = "2015-10-02-08-00-07"
   val wrongFolder = "wrong-folder"
   val fs = FileSystem.get(new Configuration)
-  val cr = new CamusStatusReader
+  val cr = new CamusStatusReader(fs)
 
   "A CamusStatusReader" should "read EtlKey values in offset-m-XXXXX sequence 
file" in {
 
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
index bfd52c0..847e693 100644
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
@@ -3,20 +3,27 @@
 import java.io.FileInputStream
 import java.util.Properties
 
-import com.github.nscala_time.time.Imports._
 import com.linkedin.camus.etl.kafka.CamusJob
 import com.linkedin.camus.etl.kafka.common.EtlKey
 import com.linkedin.camus.etl.kafka.mapred.{EtlInputFormat, 
EtlMultiOutputFormat}
 import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.log4j.Logger
-import org.joda.time.{DateTime, Hours}
+import org.apache.log4j.{LogManager, Logger}
+import org.joda.time.{Hours, DateTimeZone, DateTime}
 import org.wikimedia.analytics.refinery.camus.CamusStatusReader
 import scopt.OptionParser
+import com.github.nscala_time.time.Imports._
 
 /**
- * Created by jo on 9/25/15.
+ * Class marking checking camus runs based on a camus.properties file.
+ * It flags hdfs imported data for fully imported hours.
+ *
+ * command example (replace [*] with * in classpath - hack to prevent scala 
comment issue):
+ * java -Dlog4j.configuration=file:///home/joal/code/log4j_console.properties \
+ *      -cp 
"/home/joal/code/analytics-refinery-source/refinery-job/target/refinery-job-0.0.21-SNAPSHOT.jar:/usr/lib/spark/lib/[*]:/usr/lib/hadoop/[*]:/usr/lib/hadoop-hdfs/[*]:/usr/lib/hadoop/lib/[*]:/usr/share/java/[*]"
 \
+ *      org.wikimedia.analytics.refinery.job.CamusPartitionChecker -c 
/home/joal/camus.test.import.properties
+ *
  */
 object CamusPartitionChecker {
 
@@ -24,9 +31,10 @@
   val WHITELIST_TOPICS = EtlInputFormat.KAFKA_WHITELIST_TOPIC
   val PARTITION_BASE_PATH = EtlMultiOutputFormat.ETL_DESTINATION_PATH
 
-  // Only using HDFS, no need for proper config
-  val fs = FileSystem.get(new Configuration)
-  val camusReader: CamusStatusReader = new CamusStatusReader
+
+  // Dummy values, to be set with configuration in main
+  var fs: FileSystem = FileSystem.get(new Configuration)
+  var camusReader: CamusStatusReader = new CamusStatusReader(fs)
   val props: Properties = new Properties
   val log: Logger = Logger.getLogger(CamusPartitionChecker.getClass)
 
@@ -44,7 +52,7 @@
     val oldestNextHour = new DateTime(t1 , 
DateTimeZone.UTC).hourOfDay.roundCeilingCopy
     val youngestPreviousHour = new DateTime(t2, 
DateTimeZone.UTC).hourOfDay.roundFloorCopy
     for (h <- 0 to Hours.hoursBetween(oldestNextHour, 
youngestPreviousHour).getHours ) yield {
-      val fullHour = oldestNextHour + h.hours
+      val fullHour: DateTime = oldestNextHour + h.hours
       (fullHour.year.get, fullHour.monthOfYear.get, fullHour.dayOfMonth.get, 
fullHour.hourOfDay.get)
     }
   }
@@ -90,6 +98,7 @@
   }
 
   def flagFullyImportedPartitions(flag: String,
+                                  dryRun: Boolean,
                                   topicsAndHours: Map[String, Seq[(Int, Int, 
Int, Int)]]): Unit = {
     for ((topic, hours) <- topicsAndHours) {
       for ((year, month, day, hour) <- hours) {
@@ -98,7 +107,11 @@
         val partitionPath: Path = new Path(dir)
         if (fs.exists(partitionPath) && fs.isDirectory(partitionPath)) {
           val flagPath = new Path(s"${dir}/${flag}")
-          fs.create(flagPath)
+          if (! dryRun) {
+            fs.create(flagPath)
+            log.info(s"Flag created: ${dir}/${flag}")
+          } else
+            log.info(s"DryRun - Flag would have been created: ${dir}/${flag}")
         } else
           throw new IllegalStateException(
             s"Error on topic ${topic} - Partition folder ${partitionPath} is 
missing, can't be flagged.")
@@ -107,7 +120,11 @@
   }
 
   case class Params(camusPropertiesFilePath: String = "",
-                     flag: String = "_IMPORTED")
+                    datetimeToCheck: Option[String] = None,
+                    hadoopCoreSitePath: String = 
"/etc/hadoop/conf/core-site.xml",
+                    hadoopHdfsSitePath: String = 
"/etc/hadoop/conf/hdfs-site.xml",
+                    flag: String = "_IMPORTED",
+                    dryRun: Boolean = false)
 
   val argsParser = new OptionParser[Params]("Camus Checker") {
     head("Camus partition checker", "")
@@ -119,29 +136,80 @@
       p.copy(camusPropertiesFilePath = x)
     } text ("Camus configuration properties file path.")
 
-    opt[String]('f', "flag") optional() action { (x, p) =>
+    opt[String]('d', "datetimeToCheck") optional() valueName 
("yyyy-mm-dd-HH-MM-SS") action { (x, p) =>
+      p.copy(datetimeToCheck = Some(x))
+    } text ("Datetime camus run to check (must be present in history folder) - 
Default to most recent run.")
+
+    opt[String]("hadoop-core-site-file") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(hadoopCoreSitePath = x)
+    } text ("Hadoop core-site.xml file path for configuration.")
+
+    opt[String]("hadoop-hdfs-site-file") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(hadoopHdfsSitePath = x)
+    } text ("Hadoop hdfs-site.xml file path for configuration.")
+
+    opt[String]("flag") optional() action { (x, p) =>
       p.copy(flag = x)
     } validate { f =>
       if ((! f.isEmpty) && (f.matches("_[a-zA-Z0-9-_]+"))) success else 
failure("Incorrect flag file name")
     } text ("Flag file to be used (defaults to '_IMPORTED'.")
+
+    opt[Unit]("dry-run") optional() action { (_, p) =>
+      p.copy(dryRun = true)
+    } text ("Only print check result and if flag files would have been 
created.")
+  }
+
+  def isLog4JConfigured():Boolean = {
+    if (Logger.getRootLogger.getAllAppenders.hasMoreElements)
+      return true
+    val loggers = LogManager.getCurrentLoggers
+    while (loggers.hasMoreElements)
+      if 
(loggers.nextElement.asInstanceOf[Logger].getAllAppenders.hasMoreElements)
+        return true
+    return false
   }
 
   def main(args: Array[String]): Unit = {
+    if (! isLog4JConfigured)
+      org.apache.log4j.BasicConfigurator.configure
+
     argsParser.parse(args, Params()) match {
       case Some (params) => {
         try {
+          log.info("Loading hadoop configuration.")
+          val conf: Configuration = new Configuration()
+          conf.addResource(new Path(params.hadoopCoreSitePath))
+          conf.addResource(new Path(params.hadoopHdfsSitePath))
+          fs = FileSystem.get(conf)
+          camusReader = new CamusStatusReader(fs)
+
           log.info("Loading camus properties file.")
           props.load(new FileInputStream(params.camusPropertiesFilePath))
 
-          log.info("Getting camus most recent run from history folder.")
-          val mostRecentCamusRun = camusReader.mostRecentRun (
-            new Path (props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH)))
+          val camusPathToCheck: Path = {
+            val history_folder = 
props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH)
+            if (params.datetimeToCheck.isEmpty) {
+              log.info("Getting camus most recent run from history folder.")
+              camusReader.mostRecentRun(new Path(history_folder))
+            } else {
+              val p = new 
Path(props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH) + "/" + 
params.datetimeToCheck.get)
+              if (fs.isDirectory(p)) {
+                log.info("Set job to given datetime to check.")
+                p
+              } else {
+                log.error("The given datetime to check is not a folder in 
camus history.")
+                null
+              }
+            }
+          }
+          if (null == camusPathToCheck)
+            System.exit(1)
 
           log.info("Checking job correctness and computing partitions to flag 
as imported.")
-          val topicsAndHours = getTopicsAndHoursToFlag(mostRecentCamusRun)
+          val topicsAndHours = getTopicsAndHoursToFlag(camusPathToCheck)
 
-          log.info("Flag imported partitions.")
-          flagFullyImportedPartitions(params.flag, topicsAndHours)
+          log.info("Job is correct, flag imported partitions.")
+          flagFullyImportedPartitions(params.flag, params.dryRun, 
topicsAndHours)
 
           log.info("Done.")
         } catch {
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
index f5bfb91..0909ee8 100644
--- 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
@@ -167,7 +167,7 @@
     // correct partition base path config
     
CamusPartitionChecker.props.setProperty(CamusPartitionChecker.PARTITION_BASE_PATH,
 "/tmp/testcamus/")
 
-    CamusPartitionChecker.flagFullyImportedPartitions("_TESTFLAG", 
Map("testtopic" -> Seq((2015, 10, 2, 8))))
+    CamusPartitionChecker.flagFullyImportedPartitions("_TESTFLAG", false, 
Map("testtopic" -> Seq((2015, 10, 2, 8))))
 
     d.list should not be empty
     d.list.toSeq.map(_.toString()) should contain 
("/tmp/testcamus/testtopic/hourly/2015/10/02/08/_TESTFLAG")
@@ -184,7 +184,7 @@
     
CamusPartitionChecker.props.setProperty(CamusPartitionChecker.PARTITION_BASE_PATH,
 "/tmp/testcamus/")
 
     intercept[IllegalStateException] {
-      CamusPartitionChecker.flagFullyImportedPartitions("_TESTFLAG", 
Map("testtopic" -> Seq((2015, 10, 2, 8))))
+      CamusPartitionChecker.flagFullyImportedPartitions("_TESTFLAG", false, 
Map("testtopic" -> Seq((2015, 10, 2, 8))))
     }
   }
 

-- 
To view, visit https://gerrit.wikimedia.org/r/247847
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I3e4ada59211d1d316c4be8182d68bb3ca7d4bc95
Gerrit-PatchSet: 5
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Madhuvishy <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to