Ottomata has submitted this change and it was merged.
Change subject: Update camus-partition-checker
......................................................................
Update camus-partition-checker
Correct Configuration to use hdfs conf.
Add parameters for hdfs conf files.
Add parameter to choose which date to check in addition to last run.
Add default logging to console.
Add dryRun mode only printing information.
Change-Id: I3e4ada59211d1d316c4be8182d68bb3ca7d4bc95
---
M
refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
M
refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
M
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
M
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
4 files changed, 90 insertions(+), 26 deletions(-)
Approvals:
Ottomata: Verified; Looks good to me, approved
diff --git
a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
index c65236e..333d46d 100644
---
a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
+++
b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
@@ -1,16 +1,12 @@
package org.wikimedia.analytics.refinery.camus
import com.linkedin.camus.etl.kafka.common.EtlKey
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.io.{NullWritable, SequenceFile, Writable,
WritableComparable}
-class CamusStatusReader() {
- // Only using HDFS, so no need for proper hadoop config
- val config: Configuration = new Configuration
- val fs: FileSystem = FileSystem.get(config)
+class CamusStatusReader(fs: FileSystem) {
/**
* Reads EtlKeys from a sequence file
@@ -18,7 +14,7 @@
* @return the read EtlKey sequence
*/
def readEtlKeys(path: Path): Seq[EtlKey] = {
- val reader = new SequenceFile.Reader(fs, path, config)
+ val reader = new SequenceFile.Reader(fs, path, fs.getConf)
val key: WritableComparable[_] =
reader.getKeyClass.newInstance.asInstanceOf[WritableComparable[_]]
val value: Writable = NullWritable.get
// Would have liked to be fully functionnal ...
diff --git
a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
index 9b5141f..acafdd4 100644
---
a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
+++
b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
@@ -13,7 +13,7 @@
val mostRecentRunFolder = "2015-10-02-08-00-07"
val wrongFolder = "wrong-folder"
val fs = FileSystem.get(new Configuration)
- val cr = new CamusStatusReader
+ val cr = new CamusStatusReader(fs)
"A CamusStatusReader" should "read EtlKey values in offset-m-XXXXX sequence
file" in {
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
index bfd52c0..847e693 100644
---
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
@@ -3,20 +3,27 @@
import java.io.FileInputStream
import java.util.Properties
-import com.github.nscala_time.time.Imports._
import com.linkedin.camus.etl.kafka.CamusJob
import com.linkedin.camus.etl.kafka.common.EtlKey
import com.linkedin.camus.etl.kafka.mapred.{EtlInputFormat,
EtlMultiOutputFormat}
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.log4j.Logger
-import org.joda.time.{DateTime, Hours}
+import org.apache.log4j.{LogManager, Logger}
+import org.joda.time.{Hours, DateTimeZone, DateTime}
import org.wikimedia.analytics.refinery.camus.CamusStatusReader
import scopt.OptionParser
+import com.github.nscala_time.time.Imports._
/**
- * Created by jo on 9/25/15.
+ * Class marking checking camus runs based on a camus.properties file.
+ * It flags hdfs imported data for fully imported hours.
+ *
+ * command example (replace [*] with * in classpath - hack to prevent scala
comment issue):
+ * java -Dlog4j.configuration=file:///home/joal/code/log4j_console.properties \
+ * -cp
"/home/joal/code/analytics-refinery-source/refinery-job/target/refinery-job-0.0.21-SNAPSHOT.jar:/usr/lib/spark/lib/[*]:/usr/lib/hadoop/[*]:/usr/lib/hadoop-hdfs/[*]:/usr/lib/hadoop/lib/[*]:/usr/share/java/[*]"
\
+ * org.wikimedia.analytics.refinery.job.CamusPartitionChecker -c
/home/joal/camus.test.import.properties
+ *
*/
object CamusPartitionChecker {
@@ -24,9 +31,10 @@
val WHITELIST_TOPICS = EtlInputFormat.KAFKA_WHITELIST_TOPIC
val PARTITION_BASE_PATH = EtlMultiOutputFormat.ETL_DESTINATION_PATH
- // Only using HDFS, no need for proper config
- val fs = FileSystem.get(new Configuration)
- val camusReader: CamusStatusReader = new CamusStatusReader
+
+ // Dummy values, to be set with configuration in main
+ var fs: FileSystem = FileSystem.get(new Configuration)
+ var camusReader: CamusStatusReader = new CamusStatusReader(fs)
val props: Properties = new Properties
val log: Logger = Logger.getLogger(CamusPartitionChecker.getClass)
@@ -44,7 +52,7 @@
val oldestNextHour = new DateTime(t1 ,
DateTimeZone.UTC).hourOfDay.roundCeilingCopy
val youngestPreviousHour = new DateTime(t2,
DateTimeZone.UTC).hourOfDay.roundFloorCopy
for (h <- 0 to Hours.hoursBetween(oldestNextHour,
youngestPreviousHour).getHours ) yield {
- val fullHour = oldestNextHour + h.hours
+ val fullHour: DateTime = oldestNextHour + h.hours
(fullHour.year.get, fullHour.monthOfYear.get, fullHour.dayOfMonth.get,
fullHour.hourOfDay.get)
}
}
@@ -90,6 +98,7 @@
}
def flagFullyImportedPartitions(flag: String,
+ dryRun: Boolean,
topicsAndHours: Map[String, Seq[(Int, Int,
Int, Int)]]): Unit = {
for ((topic, hours) <- topicsAndHours) {
for ((year, month, day, hour) <- hours) {
@@ -98,7 +107,11 @@
val partitionPath: Path = new Path(dir)
if (fs.exists(partitionPath) && fs.isDirectory(partitionPath)) {
val flagPath = new Path(s"${dir}/${flag}")
- fs.create(flagPath)
+ if (! dryRun) {
+ fs.create(flagPath)
+ log.info(s"Flag created: ${dir}/${flag}")
+ } else
+ log.info(s"DryRun - Flag would have been created: ${dir}/${flag}")
} else
throw new IllegalStateException(
s"Error on topic ${topic} - Partition folder ${partitionPath} is
missing, can't be flagged.")
@@ -107,7 +120,11 @@
}
case class Params(camusPropertiesFilePath: String = "",
- flag: String = "_IMPORTED")
+ datetimeToCheck: Option[String] = None,
+ hadoopCoreSitePath: String =
"/etc/hadoop/conf/core-site.xml",
+ hadoopHdfsSitePath: String =
"/etc/hadoop/conf/hdfs-site.xml",
+ flag: String = "_IMPORTED",
+ dryRun: Boolean = false)
val argsParser = new OptionParser[Params]("Camus Checker") {
head("Camus partition checker", "")
@@ -119,29 +136,80 @@
p.copy(camusPropertiesFilePath = x)
} text ("Camus configuration properties file path.")
- opt[String]('f', "flag") optional() action { (x, p) =>
+ opt[String]('d', "datetimeToCheck") optional() valueName
("yyyy-mm-dd-HH-MM-SS") action { (x, p) =>
+ p.copy(datetimeToCheck = Some(x))
+ } text ("Datetime camus run to check (must be present in history folder) -
Default to most recent run.")
+
+ opt[String]("hadoop-core-site-file") optional() valueName ("<path>")
action { (x, p) =>
+ p.copy(hadoopCoreSitePath = x)
+ } text ("Hadoop core-site.xml file path for configuration.")
+
+ opt[String]("hadoop-hdfs-site-file") optional() valueName ("<path>")
action { (x, p) =>
+ p.copy(hadoopHdfsSitePath = x)
+ } text ("Hadoop hdfs-site.xml file path for configuration.")
+
+ opt[String]("flag") optional() action { (x, p) =>
p.copy(flag = x)
} validate { f =>
if ((! f.isEmpty) && (f.matches("_[a-zA-Z0-9-_]+"))) success else
failure("Incorrect flag file name")
} text ("Flag file to be used (defaults to '_IMPORTED'.")
+
+ opt[Unit]("dry-run") optional() action { (_, p) =>
+ p.copy(dryRun = true)
+ } text ("Only print check result and if flag files would have been
created.")
+ }
+
+ def isLog4JConfigured():Boolean = {
+ if (Logger.getRootLogger.getAllAppenders.hasMoreElements)
+ return true
+ val loggers = LogManager.getCurrentLoggers
+ while (loggers.hasMoreElements)
+ if
(loggers.nextElement.asInstanceOf[Logger].getAllAppenders.hasMoreElements)
+ return true
+ return false
}
def main(args: Array[String]): Unit = {
+ if (! isLog4JConfigured)
+ org.apache.log4j.BasicConfigurator.configure
+
argsParser.parse(args, Params()) match {
case Some (params) => {
try {
+ log.info("Loading hadoop configuration.")
+ val conf: Configuration = new Configuration()
+ conf.addResource(new Path(params.hadoopCoreSitePath))
+ conf.addResource(new Path(params.hadoopHdfsSitePath))
+ fs = FileSystem.get(conf)
+ camusReader = new CamusStatusReader(fs)
+
log.info("Loading camus properties file.")
props.load(new FileInputStream(params.camusPropertiesFilePath))
- log.info("Getting camus most recent run from history folder.")
- val mostRecentCamusRun = camusReader.mostRecentRun (
- new Path (props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH)))
+ val camusPathToCheck: Path = {
+ val history_folder =
props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH)
+ if (params.datetimeToCheck.isEmpty) {
+ log.info("Getting camus most recent run from history folder.")
+ camusReader.mostRecentRun(new Path(history_folder))
+ } else {
+ val p = new
Path(props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH) + "/" +
params.datetimeToCheck.get)
+ if (fs.isDirectory(p)) {
+ log.info("Set job to given datetime to check.")
+ p
+ } else {
+ log.error("The given datetime to check is not a folder in
camus history.")
+ null
+ }
+ }
+ }
+ if (null == camusPathToCheck)
+ System.exit(1)
log.info("Checking job correctness and computing partitions to flag
as imported.")
- val topicsAndHours = getTopicsAndHoursToFlag(mostRecentCamusRun)
+ val topicsAndHours = getTopicsAndHoursToFlag(camusPathToCheck)
- log.info("Flag imported partitions.")
- flagFullyImportedPartitions(params.flag, topicsAndHours)
+ log.info("Job is correct, flag imported partitions.")
+ flagFullyImportedPartitions(params.flag, params.dryRun,
topicsAndHours)
log.info("Done.")
} catch {
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
index f5bfb91..0909ee8 100644
---
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestCamusPartitionChecker.scala
@@ -167,7 +167,7 @@
// correct partition base path config
CamusPartitionChecker.props.setProperty(CamusPartitionChecker.PARTITION_BASE_PATH,
"/tmp/testcamus/")
- CamusPartitionChecker.flagFullyImportedPartitions("_TESTFLAG",
Map("testtopic" -> Seq((2015, 10, 2, 8))))
+ CamusPartitionChecker.flagFullyImportedPartitions("_TESTFLAG", false,
Map("testtopic" -> Seq((2015, 10, 2, 8))))
d.list should not be empty
d.list.toSeq.map(_.toString()) should contain
("/tmp/testcamus/testtopic/hourly/2015/10/02/08/_TESTFLAG")
@@ -184,7 +184,7 @@
CamusPartitionChecker.props.setProperty(CamusPartitionChecker.PARTITION_BASE_PATH,
"/tmp/testcamus/")
intercept[IllegalStateException] {
- CamusPartitionChecker.flagFullyImportedPartitions("_TESTFLAG",
Map("testtopic" -> Seq((2015, 10, 2, 8))))
+ CamusPartitionChecker.flagFullyImportedPartitions("_TESTFLAG", false,
Map("testtopic" -> Seq((2015, 10, 2, 8))))
}
}
--
To view, visit https://gerrit.wikimedia.org/r/247847
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I3e4ada59211d1d316c4be8182d68bb3ca7d4bc95
Gerrit-PatchSet: 5
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Madhuvishy <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits