Joal has uploaded a new change for review. https://gerrit.wikimedia.org/r/247847
Change subject: Correct camus-partition-checker to use hdfs conf ...................................................................... Correct camus-partition-checker to use hdfs conf Change-Id: I3e4ada59211d1d316c4be8182d68bb3ca7d4bc95 --- M refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala M refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala M refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala 3 files changed, 16 insertions(+), 11 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/47/247847/1 diff --git a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala index c65236e..333d46d 100644 --- a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala +++ b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala @@ -1,16 +1,12 @@ package org.wikimedia.analytics.refinery.camus import com.linkedin.camus.etl.kafka.common.EtlKey -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.io.{NullWritable, SequenceFile, Writable, WritableComparable} -class CamusStatusReader() { - // Only using HDFS, so no need for proper hadoop config - val config: Configuration = new Configuration - val fs: FileSystem = FileSystem.get(config) +class CamusStatusReader(fs: FileSystem) { /** * Reads EtlKeys from a sequence file @@ -18,7 +14,7 @@ * @return the read EtlKey sequence */ def readEtlKeys(path: Path): Seq[EtlKey] = { - val reader = new SequenceFile.Reader(fs, path, config) + val reader = new SequenceFile.Reader(fs, path, fs.getConf) val key: WritableComparable[_] = reader.getKeyClass.newInstance.asInstanceOf[WritableComparable[_]] val value: Writable = NullWritable.get // Would have liked to be fully functionnal ... diff --git a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala index 9b5141f..acafdd4 100644 --- a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala +++ b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala @@ -13,7 +13,7 @@ val mostRecentRunFolder = "2015-10-02-08-00-07" val wrongFolder = "wrong-folder" val fs = FileSystem.get(new Configuration) - val cr = new CamusStatusReader + val cr = new CamusStatusReader(fs) "A CamusStatusReader" should "read EtlKey values in offset-m-XXXXX sequence file" in { diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala index bfd52c0..c1854a0 100644 --- a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala @@ -24,9 +24,10 @@ val WHITELIST_TOPICS = EtlInputFormat.KAFKA_WHITELIST_TOPIC val PARTITION_BASE_PATH = EtlMultiOutputFormat.ETL_DESTINATION_PATH - // Only using HDFS, no need for proper config - val fs = FileSystem.get(new Configuration) - val camusReader: CamusStatusReader = new CamusStatusReader + + // To be set in main + var fs = FileSystem.get(new Configuration) + val camusReader: CamusStatusReader = new CamusStatusReader(fs) val props: Properties = new Properties val log: Logger = Logger.getLogger(CamusPartitionChecker.getClass) @@ -107,7 +108,8 @@ } case class Params(camusPropertiesFilePath: String = "", - flag: String = "_IMPORTED") + hadoopCoreSitePath: String = "/etc/hadoop/conf/core-site.xml", + flag: String = "_IMPORTED") val argsParser = new OptionParser[Params]("Camus Checker") { head("Camus partition checker", "") @@ -118,6 +120,10 @@ opt[String]('c', "camus-properties-file") required() valueName ("<path>") action { (x, p) => p.copy(camusPropertiesFilePath = x) } text ("Camus configuration properties file path.") + + opt[String]('h', "hadoop-core-site-file") optional() valueName ("<path>") action { (x, p) => + p.copy(hadoopCoreSitePath = x) + } text ("Hadoop core-site.xml file for configuration.") opt[String]('f', "flag") optional() action { (x, p) => p.copy(flag = x) @@ -130,6 +136,9 @@ argsParser.parse(args, Params()) match { case Some (params) => { try { + log.info("Loading hadoop core-site.xml configuration.") + fs.getConf.addResource(new Path(params.hadoopCoreSitePath)) + log.info("Loading camus properties file.") props.load(new FileInputStream(params.camusPropertiesFilePath)) -- To view, visit https://gerrit.wikimedia.org/r/247847 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I3e4ada59211d1d316c4be8182d68bb3ca7d4bc95 Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Joal <j...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits