Joal has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/247847

Change subject: Correct camus-partition-checker to use hdfs conf
......................................................................

Correct camus-partition-checker to use hdfs conf

Change-Id: I3e4ada59211d1d316c4be8182d68bb3ca7d4bc95
---
M 
refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
M 
refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
M 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
3 files changed, 16 insertions(+), 11 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/47/247847/1

diff --git 
a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
 
b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
index c65236e..333d46d 100644
--- 
a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
+++ 
b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
@@ -1,16 +1,12 @@
 package org.wikimedia.analytics.refinery.camus
 
 import com.linkedin.camus.etl.kafka.common.EtlKey
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.io.{NullWritable, SequenceFile, Writable, 
WritableComparable}
 
 
-class CamusStatusReader() {
 
-  // Only using HDFS, so no need for proper hadoop config
-  val config: Configuration = new Configuration
-  val fs: FileSystem = FileSystem.get(config)
+class CamusStatusReader(fs: FileSystem) {
 
   /**
    * Reads EtlKeys from a sequence file
@@ -18,7 +14,7 @@
    * @return the read EtlKey sequence
    */
   def readEtlKeys(path: Path): Seq[EtlKey] = {
-    val reader = new SequenceFile.Reader(fs, path, config)
+    val reader = new SequenceFile.Reader(fs, path, fs.getConf)
     val key: WritableComparable[_] = 
reader.getKeyClass.newInstance.asInstanceOf[WritableComparable[_]]
     val value: Writable = NullWritable.get
     // Would have liked to be fully functionnal ...
diff --git 
a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
 
b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
index 9b5141f..acafdd4 100644
--- 
a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
+++ 
b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
@@ -13,7 +13,7 @@
   val mostRecentRunFolder = "2015-10-02-08-00-07"
   val wrongFolder = "wrong-folder"
   val fs = FileSystem.get(new Configuration)
-  val cr = new CamusStatusReader
+  val cr = new CamusStatusReader(fs)
 
   "A CamusStatusReader" should "read EtlKey values in offset-m-XXXXX sequence 
file" in {
 
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
index bfd52c0..c1854a0 100644
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
@@ -24,9 +24,10 @@
   val WHITELIST_TOPICS = EtlInputFormat.KAFKA_WHITELIST_TOPIC
   val PARTITION_BASE_PATH = EtlMultiOutputFormat.ETL_DESTINATION_PATH
 
-  // Only using HDFS, no need for proper config
-  val fs = FileSystem.get(new Configuration)
-  val camusReader: CamusStatusReader = new CamusStatusReader
+
+  // To be set in main
+  var fs = FileSystem.get(new Configuration)
+  val camusReader: CamusStatusReader = new CamusStatusReader(fs)
   val props: Properties = new Properties
   val log: Logger = Logger.getLogger(CamusPartitionChecker.getClass)
 
@@ -107,7 +108,8 @@
   }
 
   case class Params(camusPropertiesFilePath: String = "",
-                     flag: String = "_IMPORTED")
+                    hadoopCoreSitePath: String = 
"/etc/hadoop/conf/core-site.xml",
+                    flag: String = "_IMPORTED")
 
   val argsParser = new OptionParser[Params]("Camus Checker") {
     head("Camus partition checker", "")
@@ -118,6 +120,10 @@
     opt[String]('c', "camus-properties-file") required() valueName ("<path>") 
action { (x, p) =>
       p.copy(camusPropertiesFilePath = x)
     } text ("Camus configuration properties file path.")
+
+    opt[String]('h', "hadoop-core-site-file") optional() valueName ("<path>") 
action { (x, p) =>
+      p.copy(hadoopCoreSitePath = x)
+    } text ("Hadoop core-site.xml file for configuration.")
 
     opt[String]('f', "flag") optional() action { (x, p) =>
       p.copy(flag = x)
@@ -130,6 +136,9 @@
     argsParser.parse(args, Params()) match {
       case Some (params) => {
         try {
+          log.info("Loading hadoop core-site.xml configuration.")
+          fs.getConf.addResource(new Path(params.hadoopCoreSitePath))
+
           log.info("Loading camus properties file.")
           props.load(new FileInputStream(params.camusPropertiesFilePath))
 

-- 
To view, visit https://gerrit.wikimedia.org/r/247847
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3e4ada59211d1d316c4be8182d68bb3ca7d4bc95
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <j...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to