Ottomata has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/405800 )
Change subject: [WIP] Add configurable transform function to JSONRefine
......................................................................
[WIP] Add configurable transform function to JSONRefine
Bug: T185237
Change-Id: If1272f7d354e94a0a140f71a9135389131c8a1eb
---
A
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala
M
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
A
refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala
M
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
4 files changed, 145 insertions(+), 36 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/00/405800/1
diff --git
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala
new file mode 100644
index 0000000..7cfe209
--- /dev/null
+++
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala
@@ -0,0 +1,32 @@
+package org.wikimedia.analytics.refinery.core
+
+import scala.reflect.runtime.universe
+
+object ReflectUtils {
+
+ /**
+ * Given a fully qualified String package.ObjectName and String method
name, this
+ * Function will return a scala.reflect.runtime.universe.MethodMirror
that can be
+ * used for calling the method on the object. Note that MethodMirror is
not a direct
+ * reference to the actual method, and as such does not have compile time
type
+ * and signature checking. You must ensure that you call the method with
exactly the
+ * same arguments and types that the method expects, or you will get a
runtime exception.
+ *
+ * @param moduleName Fully qualified name for an object, e.g.
org.wikimedia.analytics.refinery.core.DeduplicateEventLogging
+ * @param methodName Name of method in the object. Default "apply".
+ * @return
+ */
+ def getStaticMethodMirror(moduleName: String, methodName: String =
"apply"): universe.MethodMirror = {
+ val mirror = universe.runtimeMirror(getClass.getClassLoader)
+ val module = mirror.staticModule(moduleName)
+ val method =
module.typeSignature.member(universe.newTermName(methodName)).asMethod
+ val methodMirror =
mirror.reflect(mirror.reflectModule(module).instance).reflectMethod(method)
+ if (!methodMirror.symbol.isMethod || !methodMirror.symbol.isStatic) {
+ throw new RuntimeException(
+ s"Cannot get static method for $moduleName.$methodName, it is
not a static method"
+ )
+ }
+ methodMirror
+ }
+
+}
diff --git
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
index 8369104..dfb415c 100644
---
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
+++
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
@@ -4,9 +4,6 @@
import scala.util.control.Exception.{allCatch, ignoring}
-import org.apache.hadoop.fs.Path
-
-
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
import org.apache.spark.sql.SQLContext
@@ -19,7 +16,6 @@
// This allows us use these types with an extendend API
// that includes schema merging and Hive DDL statement generation.
import SparkSQLHiveExtensions._
-
/**
* Converts arbitrary JSON to Hive Parquet by 'evolving' the Hive table to
@@ -66,29 +62,37 @@
* Reads inputPath as JSON data, creates or alters tableName in Hive to
match the inferred
* schema of the input JSON data, and then inserts the data into the
table.
*
- * @param hiveContext Spark HiveContext
+ * @param hiveContext Spark HiveContext
*
- * @param inputPath Path to JSON data
+ * @param inputPath Path to JSON data
*
*
- * @param partition HivePartition. This helper class contains
- * database and table name, as well as external
location
- * and partition keys and values.
+ * @param partition HivePartition. This helper class contains
+ * database and table name, as well as external
location
+ * and partition keys and values.
*
- * @param isSequenceFile If true, inputPath is expected to contain JSON
in
- * Hadoop Sequence Files, else JSON in text files.
+ * @param isSequenceFile If true, inputPath is expected to contain
JSON in
+ * Hadoop Sequence Files, else JSON in text
files.
*
- * @param doneCallback Function to call after a successful run
+ * @param doneCallback Function to call after a successful run
*
- * @return The number of records refined
+ * @param transformFunction Function to do any last minute DataFrame
transformations.
+ * DO NOT return a DataFrame with a different
schema.
+ * You should use this function to do things like
+ * redacting or de-duplicatin records in your
dataframe
+ * bu you should not change the schema.
+ *
+ * @return The number of records inserted into the
HivePartition.
*/
def apply(
hiveContext: HiveContext,
inputPath: String,
partition: HivePartition,
isSequenceFile: Boolean,
- doneCallback: () => Unit
+ doneCallback: () => Unit,
+ transformFunction: Option[(DataFrame, HivePartition) => DataFrame] =
None
): Long = {
+
// Set this so we can partition by fields in the DataFrame.
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
@@ -164,18 +168,25 @@
// but, that's ok. We're only inserting into Hive now, and Spark
HiveContext will do the
// lower casing for us.
+ // If defined, apply the transformFunction to do any final
modifications to the
+ // output DataFrame (de-duplication, anonymizing, etc.)
+ val outputDf: DataFrame = transformFunction match {
+ case None => mergedSchemaDf
+ case Some(fn) => fn(mergedSchemaDf, partition)
+ case _ => throw new RuntimeException(s"Cannot call
transformFunction ${transformFunction}")
+ }
log.info(
s"""Inserting into `${partition.tableName}` DataFrame with
schema:\n
- |${mergedSchemaDf.schema.treeString} for partition
$partition""".stripMargin
+ |${outputDf.schema.treeString} for partition
$partition""".stripMargin
)
+
// Insert data into Hive table.
// TODO parameterize "overwrite" to allow "append"?
- mergedSchemaDf.write.mode("overwrite")
+ outputDf.write.mode("overwrite")
.partitionBy(partitionNames:_*)
.insertInto(partition.tableName)
- // Return the String path of the external partition we just inserted
into.
val partitionPath = hivePartitionPath(hiveContext, partition)
log.info(
s"Finished inserting into `${partition.tableName}` DataFrame, " +
diff --git
a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala
new file mode 100644
index 0000000..f34bd9d
--- /dev/null
+++
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala
@@ -0,0 +1,37 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.scalatest.{FlatSpec, Matchers}
+
+object TestObject {
+ def apply(s: String): String = {
+ "apply " + s
+ }
+
+ def otherMethod(i: Int): Int = {
+ i + 10
+ }
+}
+
+class TestReflectUtils extends FlatSpec with Matchers {
+
+ it should "Lookup object by name with apply method name" in {
+ val methodMirror = ReflectUtils.getStaticMethodMirror(
+ "org.wikimedia.analytics.refinery.core.TestObject"
+ )
+ methodMirror.symbol.fullName should
equal("org.wikimedia.analytics.refinery.core.TestObject.apply")
+ }
+
+ it should "Lookup object by name with any method name" in {
+ val methodMirror = ReflectUtils.getStaticMethodMirror(
+ "org.wikimedia.analytics.refinery.core.TestObject", "otherMethod"
+ )
+ methodMirror.symbol.fullName should
equal("org.wikimedia.analytics.refinery.core.TestObject.otherMethod")
+ }
+
+ it should "MethodMirror returned should be callable" in {
+ val methodMirror = ReflectUtils.getStaticMethodMirror(
+ "org.wikimedia.analytics.refinery.core.TestObject"
+ )
+ methodMirror("TEST") should equal ("apply TEST")
+ }
+}
\ No newline at end of file
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
index 32d7957..f82bfdf 100644
---
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
@@ -7,30 +7,30 @@
import scala.util.matching.Regex
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
-
import org.joda.time.Hours
import org.joda.time.format.DateTimeFormatter
import com.github.nscala_time.time.Imports._
-
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.DataFrame
+import org.wikimedia.analytics.refinery.core.{HivePartition, ReflectUtils,
SparkJsonToHive, Utilities}
-import org.wikimedia.analytics.refinery.core.{HivePartition, SparkJsonToHive,
Utilities}
// TODO: ERROR Hive: Table otto not found: default.otto table not found ???
// TODO: support append vs overwrite?
// TODO: Hive Table Locking?
+
/**
* Looks for hourly input partition directories with JSON data that need
refinement,
* and refines them into Hive Parquet tables using SparkJsonToHive.
*/
object JsonRefine {
-
private val log = LogManager.getLogger("JsonRefine")
private val iso8601DateFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
+
/**
* Config class for CLI argument parser using scopt
*/
@@ -45,8 +45,9 @@
inputPathDateTimeFormat: DateTimeFormatter =
DateTimeFormat.forPattern("'hourly'/yyyy/MM/dd/HH"),
tableWhitelistRegex: Option[Regex] = None,
tableBlacklistRegex: Option[Regex] = None,
+ transformFunction: String = "",
doneFlag: String = "_REFINED",
- failureFlag: String = "_REFINE_FAILED",
+// failureFlag: String = "_REFINE_FAILED",
shouldIgnoreFailureFlag: Boolean = false,
parallelism: Option[Int] = None,
compressionCodec: String = "snappy",
@@ -83,7 +84,6 @@
DateTime.parse(s, iso8601DateFormatter)
}
}
-
/**
@@ -190,6 +190,10 @@
p.copy(tableBlacklistRegex = x)
} text "Blacklist regex of table names to skip.\n".stripMargin
+ opt[String]('c', "transform-function") optional() valueName "<fn>"
action { (x, p) =>
+ p.copy(transformFunction = x)
+ } text "Blacklist regex of table names to skip.\n".stripMargin
+
opt[String]('D', "done-flag") optional() valueName "<filename>" action
{ (x, p) =>
p.copy(doneFlag = x)
} text
@@ -199,15 +203,15 @@
|data has changed meaning the partition needs to be re-refined.
|Default: _REFINED""".stripMargin.replace("\n", "\n\t") + "\n"
- opt[String]('X', "failure-flag") optional() valueName "<filename>"
action { (x, p) =>
- p.copy(failureFlag = x)
- } text
- """When a partition fails refinement, this file will be created in
the
- |output partition path with the binary timestamp of the input
source partition's
- |modification timestamp. Any partition with this flag will be
excluded
- |from refinement if the input data's modtime hasn't changed. If
the
- |modtime has changed, this will re-attempt refinement anyway.
- |Default: _REFINE_FAILED""".stripMargin.replace("\n", "\n\t") +
"\n"
+// opt[String]('X', "failure-flag") optional() valueName "<filename>"
action { (x, p) =>
+// p.copy(failureFlag = x)
+// } text
+// """When a partition fails refinement, this file will be created
in the
+// |output partition path with the binary timestamp of the input
source partition's
+// |modification timestamp. Any partition with this flag will be
excluded
+// |from refinement if the input data's modtime hasn't changed.
If the
+// |modtime has changed, this will re-attempt refinement anyway.
+// |Default: _REFINE_FAILED""".stripMargin.replace("\n", "\n\t")
+ "\n"
opt[Unit]('I', "ignore-failure-flag") optional() action { (_, p) =>
p.copy(shouldIgnoreFailureFlag = true)
@@ -289,6 +293,7 @@
}
+
/**
* Given params, refine all discovered JsonTargets.
*
@@ -321,6 +326,28 @@
params.inputPathPatternCaptureGroups: _*
)
+ log.setLevel(Level.DEBUG)
+
+ // If we are given a transformFunction name, it should be a fully
+ // qualified package.objectName to an object with an apply method that
+ // takes a DataFrame and HiveParititon, and returns a DataFrame
+ val transformFunction = params.transformFunction match {
+ case "" => None
+ case objectName => {
+ val transformMirror =
ReflectUtils.getStaticMethodMirror(objectName)
+ // Lookup the object's apply method as a reflect MethodMirror,
and wrap
+ // it in a anonymous function that has the signature expected
by
+ // SparkJsonToHive's transformFunction parameter.
+ val wrapperFn: (DataFrame, HivePartition) => DataFrame = {
+ case(df, hp) => {
+ log.info(s"Applying ${transformMirror.receiver} to
$hp")
+ transformMirror(df, hp).asInstanceOf[DataFrame]
+ }
+ }
+ // SparkJsonToHive takes transformFunction as an option.
+ Some(wrapperFn)
+ }
+ }
log.info(
s"Looking for JSON targets to refine in ${params.inputBasePath}
between " +
@@ -335,7 +362,7 @@
new Path(params.outputBasePath),
params.databaseName,
params.doneFlag,
- params.failureFlag,
+ "_REFINED_FAILED", //params.failureFlag,
params.inputPathDateTimeFormat,
inputPathRegex,
params.sinceDateTime
@@ -395,7 +422,7 @@
// next one to use the created table, or ALTER it if necessary.
We don't
// want multiple CREATEs for the same table to happen in parallel.
if (!params.dryRun)
- table -> refineJsonTargets(hiveContext, tableTargets.seq)
+ table -> refineJsonTargets(hiveContext, tableTargets.seq,
transformFunction)
// If --dry-run was given, don't refine, just map to Successes.
else
table -> tableTargets.seq.map(Success(_))
@@ -524,7 +551,8 @@
*/
def refineJsonTargets(
hiveContext: HiveContext,
- targets: Seq[JsonTarget]
+ targets: Seq[JsonTarget],
+ transformFunction: Option[(DataFrame, HivePartition) => DataFrame]
): Seq[Try[JsonTarget]] = {
targets.map(target => {
log.info(s"Beginning refinement of $target...")
@@ -535,7 +563,8 @@
target.inputPath.toString,
target.partition,
target.inputIsSequenceFile,
- () => target.writeDoneFlag()
+ () => target.writeDoneFlag(),
+ transformFunction
)
log.info(
--
To view, visit https://gerrit.wikimedia.org/r/405800
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If1272f7d354e94a0a140f71a9135389131c8a1eb
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits