Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/346291 )

Change subject: JsonRefine: refine arbitrary JSON datasets into Parquet backed 
hive tables
......................................................................


JsonRefine: refine arbitrary JSON datasets into Parquet backed hive tables

Given many config parameters, this looks for JSON datasets matching partition
patterns and date time formats, and determins which of the existent input
json partition directories need to be refined.  Those that don't exist
in the configured output tables, and those that have had input
data modifications since a previous successful refinement are slated
for refinement.

This uses SparkJsonToHive to merge existent Hive table schemas with
those inferred by Spark from the JSON data.

If schemas can not be merged, a refinement will fail, but this will not
cause the entire JsonRefine job to fail.  Reports about what has
succeeded and what has failed are output and optionally emailed.

Note:
  Edit schema is a bust.  The CREATE TABLE statement generated by this logic 
works fine directly in Hive, but
  via Spark it fails:

  Failure(java.lang.Exception: Failed refinement of EventLogging Edit 
(year=2017,month=05,day=04,hour=12) -> otto.Edit 
(/user/otto/external/eventlogging2/Edit). Original exception: 
org.apache.spark.sql.catalyst.util.DataTypeException: Unsupported dataType: 
struct<action:string,action.abort.mechanism:string,action.abort.timing:bigint,action.abort.type:string,action.init.mechanism:string,action.init.timing:bigint,action.init.type:string,action.ready.timing:bigint,action.saveAttempt.timing:bigint,action.saveFailure.message:string,action.saveFailure.timing:bigint,action.saveFailure.type:string,action.saveIntent.timing:bigint,action.saveSuccess.timing:bigint,editingSessionId:string,editor:string,integration:string,mediawiki.version:string,page.id:bigint,page.ns:bigint,page.revid:bigint,page.title:string,platform:string,user.class:string,user.editCount:bigint,user.id:bigint,version:bigint>.
 If you have a struct and a field name of it has any special characters, please 
use backticks (`) to quote that field name, e.g. `x+y`. Please note that 
backtick itself is not supported in a field name.)

  This Error comes from Spark reading the schema out of hive directly, when 
calling hiveContext.table(tableName),
  where the table has crazy struct fields with dots in the name.  Hive doesn't 
seem to mind.
  I don't think we can fix this, and as such, the EventLogging Analytics Edit 
table will not be refineable by Spark.

  mediawiki_page_properties_change also seems to not work, due to the variable 
object property types.

Otherwise, this is working great for both EventLogging Analytics tables, and 
for EventBus style data.

Bug: T161924
Change-Id: Ieb2c3a99501623d71fa58ac7dfb6734cb809096f
---
M refinery-core/pom.xml
M 
refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
A 
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/HivePartition.scala
A 
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
A 
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkSQLHiveExtensions.scala
A 
refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestHivePartition.scala
A 
refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestSparkSQLHiveExtensions.scala
M refinery-job/pom.xml
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestJsonRefine.scala
10 files changed, 2,448 insertions(+), 1 deletion(-)

Approvals:
  Ottomata: Verified
  Joal: Looks good to me, approved



diff --git a/refinery-core/pom.xml b/refinery-core/pom.xml
index ce6e27b..247582c 100644
--- a/refinery-core/pom.xml
+++ b/refinery-core/pom.xml
@@ -76,10 +76,50 @@
         </dependency>
 
         <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_2.10</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.nscala-time</groupId>
+            <artifactId>nscala-time_2.10</artifactId>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
+        <dependency>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>12.0</version>
+            </dependency>
+        <dependency>
             <groupId>org.reflections</groupId>
             <artifactId>reflections</artifactId>
             <version>0.9.7</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+
+        <dependency>
+            <groupId>javax.mail</groupId>
+            <artifactId>mail</artifactId>
+            <version>1.4.7</version>
+        </dependency>
+
+
     </dependencies>
 
     <build>
diff --git 
a/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
 
b/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
index 9f99ea4..6dbdd2a 100644
--- 
a/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
+++ 
b/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
@@ -16,8 +16,15 @@
 
 package org.wikimedia.analytics.refinery.core;
 
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
 /**
@@ -109,4 +116,36 @@
 
 }
 
+
+    public static void sendEmail(
+        String smtpHost,
+        String smtpPort,
+        String fromEmail,
+        String[] toEmails,
+        String subject,
+        String body
+    ) {
+        Properties props = new Properties();
+        props.put("mail.smtp.host", smtpHost);
+        props.put("mail.smtp.auth", "false");
+        props.put("mail.smtp.port", smtpPort);
+
+        Session session = Session.getDefaultInstance(props);
+
+        try {
+            MimeMessage message = new MimeMessage(session);
+            message.setFrom(new InternetAddress(fromEmail));
+            for (String email : toEmails) {
+                message.addRecipient(Message.RecipientType.TO, new 
InternetAddress(email));
+            }
+            message.setSubject(subject);
+            message.setText(body);
+
+            Transport.send(message);
+        }
+        catch (MessagingException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
 }
\ No newline at end of file
diff --git 
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/HivePartition.scala
 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/HivePartition.scala
new file mode 100644
index 0000000..01a83bb
--- /dev/null
+++ 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/HivePartition.scala
@@ -0,0 +1,157 @@
+package org.wikimedia.analytics.refinery.core
+
+import scala.collection.immutable.ListMap
+import scala.util.matching.Regex
+
+
+/**
+  * Represents a full Hive table partition.
+  * Note: Keeping the order of partition parameters is important to be able to 
insert
+  * data in the right sequence. Hence the ListMap type of the 'partitions' 
parameter.
+  *
+  * @param database   Hive database
+  * @param t          Hive table (this will be normalized as .table)
+  * @param location   Hive table LOCATION path
+  * @param partitions ListMap of partition keys -> partition values.
+  */
+case class HivePartition(
+    database: String,
+    private val t: String,
+    location: String,
+    partitions: ListMap[String, String] = ListMap()
+) {
+    val table: String = HivePartition.normalize(t)
+    val tableName: String = s"$database.$table"
+
+    /**
+      * Seq of partition keys
+      */
+    val keys: Seq[String] = partitions.keys.toSeq
+
+    /**
+      * A string suitable for use in Hive QL partition operations,
+      * e.g. year=2017,month=07,day=12,hour=0
+      */
+    val hiveQL: String = {
+        partitions.map { case (k: String, v: String) => {
+            v match {
+                // If the value looks like a number, strip leading 0s
+                case n if n.forall(_.isDigit) => (k, 
n.replaceFirst("^0+(?!$)", ""))
+                // Else the value should be a string, then quote it.
+                case s => (k, s""""$s"""")
+            }
+        }}
+        .map(p => s"${p._1}=${p._2}").mkString(",")
+    }
+
+    /**
+      * A relative (LOCATION-less) Hive partition path string.
+      * This is how Hive creates partition directories.
+      */
+    val relativePath: String = {
+        partitions.map { case (k: String, v: String) => {
+            v match {
+                // If the value looks like a number, strip leading 0s
+                case n if n.forall(_.isDigit) => (k, 
n.replaceFirst("^0+(?!$)", ""))
+                // Else the value should be a string, no need to quote in path.
+                case s => (k, s)
+            }
+        }}
+        .map(p => s"${p._1}=${p._2}").mkString("/")
+    }
+
+    /**
+      * Absolute path to this Hive table partition.
+      */
+    val path: String = location + "/" + relativePath
+
+    /**
+      * Get a partition value by key
+      * @param key partition key
+      * @return
+      */
+    def get(key: String): Option[String] = {
+        partitions.get(key)
+    }
+
+    /**
+      * True if there are actual partition values defined.
+      * @return
+      */
+    def nonEmpty: Boolean = {
+        partitions.nonEmpty
+    }
+
+    override def toString: String = {
+        s"$table ($hiveQL)"
+    }
+}
+
+
+/**
+  * Companion object with helper constructor via apply.
+  * This allows construction of a HivePartition via a String,
+  * rather than a ListMap directly.
+  */
+object HivePartition {
+    /**
+      *
+      * @param s
+      * @return
+      */
+    def normalize(s: String): String = {
+        s.replace("-", "_")
+    }
+
+    /**
+      * This helper constructor will use an extractor regex to get the table 
and partitions
+      * out of a path to extract from, and return a new HivePartition.
+      *
+      * @param database         Hive database
+      * @param baseLocation     Base location to where this table will be 
stored.  Final table
+      *                         location will be baseLocation/table, where 
table is the extracted
+      *                         and normalized table name.
+      * @param pathToExtract    path string from which extractorRegex will get 
table and partitions
+      * @param extractorRegex   regex that captures table and partitions from 
pathToExtract
+      * @return
+      */
+    def apply(
+        database: String,
+        baseLocation: String,
+        pathToExtract: String,
+        extractorRegex: Regex
+    ): HivePartition = {
+        // Get a ListMap of all named groups captured from inputPathRegex
+        val capturedKeys = captureListMap(pathToExtract, extractorRegex)
+        // "table" MUST be an extracted key.
+        val table = normalize(capturedKeys("table"))
+        // The hive table location is baseLocation/table
+        val location = baseLocation + "/" + table
+        // The partitions are any other key=value pairs groups captured by the 
regex.
+        val partitions = capturedKeys - "table"
+
+        new HivePartition(database, table, location, partitions)
+    }
+
+    /**
+      * This will extract the matched regex groupNames and their captured 
values
+      * into a ListMap.  ListMap is used so that order is preserved.
+      *
+      * @param  s                   String to match
+      * @param  regex               Regex with named capture groups
+      * @throws RuntimeException    if regex does not match s
+      * @return
+      */
+    private def captureListMap(s: String, regex: Regex): ListMap[String, 
String] = {
+        val m = regex.findFirstMatchIn(s).getOrElse(
+            throw new RuntimeException(
+                s"Regex $regex did not match $s when attempting to extract 
capture group keys"
+            )
+        )
+
+        m.groupNames.foldLeft(ListMap[String, String]()) {
+            case (currMap, (name: String)) => currMap ++ ListMap(name -> 
m.group(name))
+        }
+    }
+
+}
diff --git 
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
new file mode 100644
index 0000000..8369104
--- /dev/null
+++ 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
@@ -0,0 +1,413 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.apache.log4j.LogManager
+
+import scala.util.control.Exception.{allCatch, ignoring}
+
+import org.apache.hadoop.fs.Path
+
+
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.functions.lit
+
+// Import implicit StructType and StructField conversions.
+// This allows us use these types with an extendend API
+// that includes schema merging and Hive DDL statement generation.
+import SparkSQLHiveExtensions._
+
+
+/**
+  * Converts arbitrary JSON to Hive Parquet by 'evolving' the Hive table to
+  * add new fields as they are encountered in the JSON data.
+  *
+  * Usage:
+  *
+  *     SparkJsonToHive(
+  *         hiveContext,
+  *         "/path/to/input/data/MyCoolEvent/2017/07/01/00",
+  *         HivePartition(
+  *             "mydatabase", "mytable", "/path/to/external/mytable",
+  *             ListMap("year" -> 2017, "month" -> "07", "day" -> "30", "hour" 
-> "00)
+  *         )
+  *      )
+  *
+  * If mydb.MyCoolEventTable does not exist in Hive, it will be created based 
on the JSON
+  * records and Spark schemas infered by merging all fields from all records 
found in
+  * the inputPath.
+  *
+  * Later, after more JSON data has been imported, you can run:
+  *
+  *    SparkJsonToHive(
+  *         hiveContext,
+  *         "/path/to/input/data/MyCoolEvent/2017/07/01/01",
+  *         HivePartition(
+  *             "mydatabase", "mytable", "/path/to/external/mytable",
+  *             ListMap("year" -> 2017, "month" -> "07", "day" -> "30", "hour" 
-> "01")
+  *         )
+  *    )
+  *
+  * If any new fields are encountered in this new input data, the now existent
+  * mydatabase.mytable table in Hive will be altered to include these fields.
+  *
+  * If any type changes are encountered between the existent Hive table and the
+  * new input data, an IllegalStateException Exception will be thrown.
+  *
+  */
+object SparkJsonToHive {
+    private val log = LogManager.getLogger("SparkJsonToHive")
+
+
+    /**
+      * Reads inputPath as JSON data, creates or alters tableName in Hive to 
match the inferred
+      * schema of the input JSON data, and then inserts the data into the 
table.
+      *
+      * @param hiveContext      Spark HiveContext
+      *
+      * @param inputPath        Path to JSON data
+      *
+      *
+      * @param partition        HivePartition.  This helper class contains
+      *                         database and table name, as well as external 
location
+      *                         and partition keys and values.
+      *
+      * @param isSequenceFile   If true, inputPath is expected to contain JSON 
in
+      *                         Hadoop Sequence Files, else JSON in text files.
+      *
+      * @param doneCallback     Function to call after a successful run
+      *
+      * @return                 The number of records refined
+      */
+    def apply(
+        hiveContext: HiveContext,
+        inputPath: String,
+        partition: HivePartition,
+        isSequenceFile: Boolean,
+        doneCallback: () => Unit
+    ): Long = {
+        // Set this so we can partition by fields in the DataFrame.
+        hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
+
+        // Read the JSON data into a DataFrame and
+        // add constant value partition columns.
+        val inputDf = dataFrameWithHivePartitions(
+            readJsonDataFrame(hiveContext.asInstanceOf[SQLContext], inputPath, 
isSequenceFile),
+            partition
+        )
+
+        // Grab the partition name keys to use for Hive partitioning.
+        val partitionNames = partition.keys
+
+        try {
+            // This will create the Hive table based on inputDf.schema if it 
doesn't yet exist,
+            // or it will alter an existent table adding fields so that it 
matches inputDf.schema.
+            // When comparing and running DDL statements, all fields will be 
lowercased and
+            // made nullable.
+            prepareHiveTable(
+                hiveContext,
+                inputDf.schema,
+                partition.tableName,
+                partition.location,
+                partitionNames
+            )
+        } catch {
+            case e: IllegalStateException => {
+                log.fatal(
+                    s"""Failed preparing Hive table ${partition.tableName} with
+                       |input 
schema:\n${inputDf.schema.treeString}""".stripMargin, e)
+                throw e
+            }
+        }
+
+        // We need to re-read the JSON data with table schema merged with a 
schema
+        // for all fields in the JSON data.  This will allow us to be sure that
+        // A. fields we read in from JSON are in the same order as the Hive 
table,
+        // and B. that fields not present in the JSON are set to null.
+        //
+        // NOTE: It should be possible to use inputDf with fields reordered to 
match
+        // the Hive table schema field ordering exactly, rather than having to
+        // re-read the JSON data a second time with the merged schema.  I 
haven't
+        // been able to successfully do this, mainly because I didn't figure 
out how to use
+        // the Spark DataFrame / StructType APIs to fully reorder and add 
fields.  We'd have
+        // to recursively reorder and add missing fields to inputDf.
+        // IF we can do this, then it should be possible to abstract away the 
'JSON-ness' of
+        // This code, and have it work with any DataFrame, since we could 
pre-load
+        // the DataFrame before passing it to this object.  We wouldn't need 
the inputPath
+        // or original data format (JSON) in order to infer the schema.
+
+
+        val table = hiveContext.table(partition.tableName)
+
+        // normalize=false means that top level fields will not be lowercased, 
but
+        // they will always be made nullable.  We need this because we are 
about to use
+        // this schema to re-read the json data, and here, case matters.  Any 
fields that
+        // are in inputDf must have the same case in the schema we use to read 
the JSON.
+        val nonNormalizedSchema = table.schema.merge(inputDf.schema, 
normalize=false)
+
+        val mergedSchemaDf = dataFrameWithHivePartitions(
+            // re-read the JSON data with the merged non normalized schema.  
Any fields in this
+            // schema that are not in the data (i.e. they are in the table, 
but not in this JSON),
+            // will automatically be set to null by Spark.
+            readJsonDataFrame(
+                hiveContext.asInstanceOf[SQLContext],
+                inputPath,
+                isSequenceFile,
+                Option(nonNormalizedSchema)
+            ),
+            partition
+        )
+        // mergedSchemaDf will still have non-normalized (e.g. with capital 
letter) field names,
+        // but, that's ok.  We're only inserting into Hive now, and Spark 
HiveContext will do the
+        // lower casing for us.
+
+        log.info(
+            s"""Inserting into `${partition.tableName}` DataFrame with 
schema:\n
+               |${mergedSchemaDf.schema.treeString} for partition 
$partition""".stripMargin
+        )
+        // Insert data into Hive table.
+        // TODO parameterize "overwrite" to allow "append"?
+        mergedSchemaDf.write.mode("overwrite")
+            .partitionBy(partitionNames:_*)
+            .insertInto(partition.tableName)
+
+
+        // Return the String path of the external partition we just inserted 
into.
+        val partitionPath = hivePartitionPath(hiveContext, partition)
+        log.info(
+            s"Finished inserting into `${partition.tableName}` DataFrame, " +
+            s"wrote to external location $partitionPath."
+        )
+
+        // call doneCallback
+        doneCallback()
+
+        mergedSchemaDf.count()
+    }
+
+    /**
+      * Reads a JSON data set out of path.  If isSequenceFile is true, the 
data should
+      * be in Hadoop Sequence file format, instead of text.
+      *
+      * @param sqlContext       Spark SQLContext
+      *
+      * @param path             Path to JSON data
+      *
+      * @param isSequenceFile   If true (default), path is expected to contain 
Hadoop
+      *                         Sequence Files with JSON record strings as 
values, else
+      *                         just JSON text files.
+      *
+      * @param schema           Optional schema to use for reading data.  If 
not given,
+      *                         Spark's JSON reading logic will infer the 
schema by passing over
+      *                         all data and examining fields in each record.
+      *
+      * @return
+      */
+    def readJsonDataFrame(
+        sqlContext: SQLContext,
+        path: String,
+        isSequenceFile: Boolean = true,
+        schema: Option[StructType] = None
+    ): DataFrame = {
+        // If we have a schema, then use it to read JSON data,
+        // else just infer schemas while reading.
+        val dfReader = if (schema.isDefined) {
+            sqlContext.read.schema(schema.get)
+        }
+        else {
+            sqlContext.read
+        }
+
+        if (isSequenceFile) {
+            // Load DataFrame from JSON data in path
+            dfReader.json(
+                // Expect data to be SequenceFiles with JSON strings as values.
+                sqlContext.sparkContext.sequenceFile[Long, String](path).map(t 
=> t._2)
+            )
+        }
+        else {
+            dfReader.json(path)
+        }
+    }
+
+
+    /**
+      * If tableName does not exist in Hive, this will create it with the 
schema newSchema.
+      * Else, it will attempt to alter the table to add any fields in 
newSchema that are
+      * not already in the Hive table.  If any fields change types, this will 
throw an
+      * exception.
+      *
+      * @param hiveContext      Spark HiveContext
+      *
+      * @param newSchema        Spark schema representing the schema of the
+      *                         table to be created or altered.
+      *
+      * @param tableName        Fully qualified (dotted) Hive table name.
+      *
+      * @param locationPath     Path to external table data.
+      *
+      * @param partitionNames   List of partition names.  These must be 
present as
+      *                         fields in the newSchema and/or Hive table.
+      *
+      * @return
+      */
+    def prepareHiveTable(
+        hiveContext: HiveContext,
+        newSchema: StructType,
+        tableName: String,
+        locationPath: String = "",
+        partitionNames: Seq[String] = Seq.empty
+    ): Boolean = {
+        val ddlStatements = getDDLStatements(
+            hiveContext, newSchema, tableName, locationPath, partitionNames
+        )
+
+        // CREATE or ALTER the Hive table if we have a change to make.
+        if (ddlStatements.nonEmpty) {
+            ddlStatements.foreach { (s) =>
+                log.info(s"Running Hive DDL statement:\n$s")
+                ignoring(classOf[AlreadyExistsException]) {
+                    hiveContext.sql(s)
+                }
+            }
+            // Refresh Spark's metadata about this Hive table.
+            hiveContext.refreshTable(tableName)
+            true
+        }
+        else
+            false
+    }
+
+
+    /**
+      * If tableName does not exist in Hive, this will return a single element 
Seq
+      * with a Hive CREATE TABLE statement to create a table that represents 
newSchema.
+      * Else, if tableName does exist, it will return a Seq of Hive ALTER 
TABLE statements
+      * required to update the table schema to match any new fields found in 
newSchema.
+      *
+      * @param hiveContext      Spark HiveContext
+      *
+      * @param newSchema        Spark schema representing the schema of the
+      *                         table to be created or altered.
+      *
+      * @param tableName        Fully qualified (dotted) Hive table name.
+      *
+      * @param locationPath     Path to external table data.
+      *
+      * @param partitionNames   List of partition names.  These must be 
present as
+      *                         fields in the newSchema and/or Hive table.
+      *
+      * @return
+      */
+    def getDDLStatements(
+        hiveContext: HiveContext,
+        newSchema: StructType,
+        tableName: String,
+        locationPath: String = "",
+        partitionNames: Seq[String] = Seq.empty
+    ): Iterable[String] = {
+        // If the Hive table doesn't exist, get CREATE DDL to create it like 
newSchema.
+        if (!hiveTableExists(hiveContext, tableName)) {
+            Seq(newSchema.hiveCreateDDL(tableName, locationPath, 
partitionNames))
+        }
+        // Else get ALTER DDL statements to alter the existing table to add 
fields from new Schema.
+        else {
+            val tableSchema = hiveContext.table(tableName).schema
+            val alterDDLs = tableSchema.hiveAlterDDL(tableName, newSchema)
+            if (alterDDLs.nonEmpty) {
+                log.info(s"""Found difference in schemas for Hive table 
`$tableName`
+                            |Table schema:\n${tableSchema.treeString}
+                            |Input 
schema:\n${newSchema.treeString}""".stripMargin
+                )
+            }
+
+            alterDDLs
+        }
+    }
+
+
+    /**
+      * Returns true if the fully qualified tableName exists in Hive, else 
false.
+      *
+      * @param hiveContext Spark HiveContext
+      *
+      * @param tableName   Fully qualified Hive table name
+      *
+      * @return
+      */
+    def hiveTableExists(hiveContext: HiveContext, tableName: String): Boolean 
= {
+        allCatch.opt(hiveContext.table(tableName)) match {
+            case Some(_) => true
+            case _       => false
+        }
+    }
+
+
+    /**
+      * Returns a new DataFrame with constant Hive partitions added as 
columns.  If any
+      * column values are convertable to Ints, they will be added as an Int, 
otherwise String.
+      *
+      * @param df           Input DataFrame
+      *
+      * @param partition    HivePartition
+      *
+      * @return
+      */
+    def dataFrameWithHivePartitions(
+        df: DataFrame,
+        partition: HivePartition
+    ): DataFrame = {
+        // Add partitions to DataFrame.
+        partition.keys.foldLeft(df) {
+            case (currentDf, (key: String)) =>
+                val value = partition.get(key).get
+                log.info(s"Adding partition $key=$value")
+                // If the partition value looks like an Int, convert it,
+                // else just use as a String.  lit() will convert the Scala
+                // value (Int or String here) into a Spark Column type.
+                currentDf.withColumn(key, 
lit(allCatch.opt(value.toInt).getOrElse(value)))
+        }
+    }
+
+
+    /**
+      * Runs a DESCRIBE FORMATTED query and extracts path to a
+      * Hive table partition.
+      *
+      * @param hiveContext  HiveContext
+      * @param partition    HivePartition
+      * @return
+      */
+    def hivePartitionPath(
+        hiveContext: HiveContext,
+        partition: HivePartition
+    ): String = {
+        var q = s"DESCRIBE FORMATTED `${partition.tableName}`"
+        if (partition.nonEmpty) {
+            q += s" PARTITION (${partition.hiveQL})"
+        }
+
+        // This query will return a human readable block of text about
+        // this table (partition).  We need to parse out the Location 
information.
+        val locationLine: Option[String] = hiveContext.sql(q)
+            .collect()
+            // Each line is a Row[String], map it to Seq of Strings
+            .map(_.toString())
+            // Find the line with "Location"
+            .find(_.contains("Location"))
+
+        // If we found Location in the string, then extract just the location 
path.
+        if (locationLine.isDefined) {
+            locationLine.get.filterNot("[]".toSet).trim.split("\\s+").last
+        }
+        // Else throw an Exception.  This shouldn't happen, as the Hive query
+        // will fail earlier if if the partition doesn't exist.
+        else {
+            throw new RuntimeException(s"Failed finding path of Hive table 
partition $partition")
+        }
+    }
+
+}
diff --git 
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkSQLHiveExtensions.scala
 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkSQLHiveExtensions.scala
new file mode 100644
index 0000000..1ef1ece
--- /dev/null
+++ 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkSQLHiveExtensions.scala
@@ -0,0 +1,438 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.apache.spark.sql.types._
+
+
+/**
+  * Implicit method extensions to Spark's StructType and StructField.
+  * This is useful for converting and evolving Hive tables to match
+  * Spark DataTypes.
+  *
+  * Usage:
+  *
+  *     import org.wikimedia.analytics.refinery.core.SparkSQLHiveExtensions._
+  *
+  *     val df: DataFrame = // read original source data
+  *     val hiveCreateStatement = df.schema.hiveCreateDDL("mydb.mytable")
+  *     hiveContext.sql(hiveCreateStatement)
+  *     //...
+  *
+  *     val table = hiveContext.table("mydb.table")
+  *     val df: DataFrame =  // read new data, maybe has new fields
+  *     val hiveAlterStatement = table.schema.hiveAlterDDL("mydb.mytable", 
df.schema)
+  *     hiveContext.sql(hiveAlterStatement)
+  */
+object SparkSQLHiveExtensions {
+
+    /**
+      * Implicit methods extensions for Spark StructField.
+      *
+      * @param field
+      */
+    implicit class StructFieldExtensions(field: StructField) {
+
+        /**
+          * Returns a copy of this StructField with a name toLowerCase.
+          * @return
+          */
+        def toLowerCase: StructField = {
+            Option(field.name).map(n => 
field.copy(name=n.toLowerCase)).getOrElse(field)
+        }
+
+        /**
+          * Returns a nullable or non nullable copy of this StructField.
+          * @param nullable
+          * @return
+          */
+        def makeNullable(nullable: Boolean = true): StructField =
+            field.copy(nullable=nullable)
+
+        /**
+          * Normalizes (toLowerCase and makeNullable) a copy of this 
StructField.
+          * Ints are converted to Longs, Floats are converted to Doubles.
+          * Longs and Doubles will handle more cases where field values
+          * look like an int or float during one iteration, and a long or 
double later.
+          *
+          * Hyphens will be converted to underscores.
+          * @param lowerCase if true, the field name will be lower cases.  
Default: true
+          * @return
+          */
+        def normalize(lowerCase: Boolean = true): StructField = {
+            val f = {field.dataType match {
+                case IntegerType    => field.copy(dataType=LongType)
+                case FloatType      => field.copy(dataType=DoubleType)
+                case _              => field
+            }}.copy(name=field.name.replace('-', 
'_')).makeNullable(nullable=true)
+            if (lowerCase) f.toLowerCase else f
+        }
+
+
+        /**
+          * Builds a Hive DDL string representing the Spark field, useful in
+          * CREATE and ALTER DDL statements.
+          *
+          * @return
+          */
+        def hiveColumnDDL: String = {
+            s"`${field.name}` ${field.typeString}" +
+                s"${if (field.nullable) "" else " NOT NULL"}"
+        }
+
+
+        /**
+          * Spark's DataType.simpleString mostly works for Hive field types.  
But, since
+          * we need struct field names to be backtick quoted, we need to call
+          * a special method for in the case where this field is a StructType.
+          *
+          * @return
+          */
+        def typeString: String = {
+            if (field.isStructType) {
+                field.dataType.asInstanceOf[StructType].quotedSimpleString
+            }
+            else {
+                field.dataType.simpleString
+            }
+        }
+
+
+        /**
+          * Returns true if this field.dataType is a StructType, else False.
+          * @return
+          */
+        def isStructType: Boolean = {
+            field.dataType match {
+                case(StructType(_)) => true
+                case _ => false
+            }
+        }
+
+        def isLongType: Boolean = {
+            field.dataType match {
+                case(LongType) => true
+                case _ => false
+            }
+        }
+
+    }
+
+
+    /**
+      * Implicit method extensions for Spark StructType.
+      *
+      * @param struct
+      */
+    implicit class StructTypeExtensions(struct: StructType) {
+
+        /**
+          * Returns a copy of this struct with all fields 'normalized'.
+          * If lowerCase is true, then the field name will be lower cased.
+          * This function recurses on sub structs, and normalizes them
+          * with lowerCase = false, keeping the cases on sub struct field 
names.
+          *
+          * All ints will be converted to longs, and all floats will be
+          * converted to doubles.  A field value that may
+          * at one time look like an int, may during a later iteration
+          * look like a long.  We choose to always use the larger data type.
+          *
+          * @param lowerCase Default: false
+          * @return
+          */
+        def normalize(lowerCase: Boolean = true): StructType = {
+            StructType(struct.foldLeft(Seq.empty[StructField])(
+                (fields: Seq[StructField], field: StructField) => {
+                    // toLowerCase and makeNullable this field.
+                    val fieldNormalized = field.normalize(lowerCase=lowerCase)
+
+                    if (field.isStructType) {
+                        fields :+ fieldNormalized.copy(
+                            
dataType=fieldNormalized.dataType.asInstanceOf[StructType]
+                                .normalize(lowerCase=false)
+                        )
+                    }
+                    else {
+                        fields :+ fieldNormalized
+                    }
+                }
+            ))
+        }
+
+
+        // NOTE: Fully recursive normalize and denormalize was implemented at
+        // 
https://gist.github.com/jobar/91c552321efbedba03c8215284726f88#gistcomment-2077149,
+        // but we have decided not to include this functionality.  Sub 
StructType field names
+        // are not easily denormalizeable.  It is legal to have field names at 
different
+        // struct levels that all normalize to the same name.  Reverting them 
back to
+        // their original names would require building and keeping a recursive 
map that matched
+        // the original struct hierarchy exactly.
+        // These methods were created in order to build Spark schemas that can 
be used
+        // to maintain and update a Hive table, and Hive seems to be 
indifferent to
+        // cases used in its struct<> column type.
+
+
+        /**
+          * Returns a new StructType with otherStruct merged into this.  Any 
identical duplicate
+          * fields shared by both will be reduced to one field.  Non 
StructType Fields with the
+          * same name but different types will result in an 
IllegalStateException.  StructType
+          * fields with the same name will be recursively merged.  All fields 
will
+          * be made nullable.  Comparison of top level field names is done 
case insensitively,
+          * i.e. myField is equivalent to myfield.
+          *
+          * @param otherStruct  Spark StructType schema
+          *
+          * @param normalize    If False, the returned schema will contain the 
original
+          *                     (non lowercased) field names. Comparison of 
fields will
+          *                     still be done case insensitive.
+          *
+          * @return
+          */
+        def merge(otherStruct: StructType, normalize: Boolean = true): 
StructType = {
+            val combined = StructType(struct ++ otherStruct)
+            val combinedNormalized = combined.normalize()
+
+            // Distinct using case insensitive and types.
+            // Result will be sorted by n1 fields first, with n2 fields at the 
end.
+            // distinctFields could still have repeat field names with 
different types.
+            val distinctFields: Seq[StructField] = combinedNormalized.distinct
+
+            val distinctNames: Seq[String] = 
combinedNormalized.fieldNames.distinct
+
+            // Store a map of fields by name
+            val fieldsByName: Map[String, Seq[StructField]] = 
distinctFields.groupBy(_.name)
+
+            // Find fields with repeat field names, e.g.
+            // Where there's more than one field per name.
+            val repeatFieldsByName: Map[String, Seq[StructField]] =
+                fieldsByName.filter(_._2.size > 1)
+
+            // If any of the repeated fields is a non StructType, then throw 
Exception now.
+            // We can't deal with that crap!
+            repeatFieldsByName.foreach {
+                // If there are any fields for this field name that aren't 
StructTypes
+                case(name, fields) if fields.exists(!_.isStructType) => {
+                    throw new IllegalStateException(
+                        s"""merge failed - Field $name is repeated with 
multiple non
+                           |StructType types: ${fields.mkString(" , 
")}""".stripMargin
+                    )
+                }
+                case _ => ()
+            }
+
+            // Ok!  If we get this far, we don't have any non StructType type 
changes.
+            // repeatFieldsByName will only contain StructType fields.
+            // Build the mergedStruct from all non repeated fields + merged 
repeated struct fields.
+            val mergedStruct = StructType(
+                distinctNames.map { name =>
+                    // If this field name only occurred once, then we can just
+                    // grab the single field out of fieldsByName and keep it.
+                    if (!repeatFieldsByName.contains(name)) {
+                        fieldsByName(name).head
+                    }
+                    // Else, the field name is repeated, and we already know 
that the
+                    // repeated types are all StructTypes. We can deal with 
StructType
+                    // type changes by merging the structs.
+                    else {
+                        // Get the Seq of StructFields that are the different 
StructTypes
+                        // for this field name.
+                        val mergedStruct = repeatFieldsByName(name)
+                            // Map each StructField to its StructType DataType
+                            .map(_.dataType.asInstanceOf[StructType])
+                            // Recursively merge each StructType together
+                            .foldLeft(StructType(Seq.empty))((merged, current) 
=>
+                                // Don't normalize sub struct schemas.  Spark 
doesn't
+                                // lowercase Hive struct<> field names, and 
those
+                                // seem to be nullable by default anyway.
+                                // If we did normalize, then we'd have to 
recursively
+                                // un-normalize if the original caller passed 
normalize=false.
+                                merged.merge(current, normalize=false)
+                        )
+
+                        // Convert the StructType back into a StructField with 
this field name.
+                        StructField(name, mergedStruct, nullable=true)
+                    }
+                }
+            )
+
+            // If we want the normalized (lower cased) field names, return 
mergedStruct now.
+            if (normalize) {
+                mergedStruct
+            }
+            // Else we want the mergedStruct returned with the original
+            // top level non normalized field names.
+            else {
+                // Build a map from top level lowercased names to non normal 
names.
+                val lookup: Map[String, String] = combined.fieldNames
+                    .distinct.map(n => n.toLowerCase -> n).toMap
+                // Map the mergedStruct, renaming any fields in lookup.
+                StructType(mergedStruct.map(f => 
f.copy(name=lookup.getOrElse(f.name, f.name))))
+            }
+        }
+
+
+        /**
+          * Like simpleString, but backtick quotes field names inside of the 
struct<>.
+          * e.g. struct<`fieldName`:string,`database`:string>
+          * This works better in Hive, as reserved keywords need to be quoted.
+          * If this struct contains sub structs, those will be recursively 
quoted too.
+          *
+          * @return
+          */
+        def quotedSimpleString: String = {
+            val fieldTypes = struct.fields.map { field =>
+                if (field.isStructType) {
+                    
s"`${field.name}`:${field.dataType.asInstanceOf[StructType].quotedSimpleString}"
+                }
+                else {
+                    s"`${field.name}`:${field.dataType.simpleString}"
+                }
+            }
+            s"struct<${fieldTypes.mkString(",")}>"
+        }
+
+        /**
+          * Returns String representing Hive column DDL, for use in Hive DDL 
statements.
+          * This only returns the portion of the DDL statement representing 
each column.
+          * E.g.
+          * `fieldname1` string,
+          * `fieldname2` bigint
+          * ..
+          *
+          * @param sep column DDL separator, default ",\n"
+          *
+          * @return
+          */
+        def hiveColumnsDDL(sep: String = ",\n"): String =
+            struct.map(_.hiveColumnDDL).mkString(sep)
+
+
+        /**
+          * Builds a Hive CREATE statement DDL string from this StructType 
schema.
+          * Since Hive is case insensitive, the top level field names will 
lowercased.
+          * To ease integration with missing fields in data, all fields are 
made nullable.
+          *
+          * @return CREATE statement DDL string
+          */
+        def hiveCreateDDL(
+            tableName: String,
+            locationPath: String = "",
+            partitionNames: Seq[String] = Seq.empty
+        ): String = {
+            val schemaNormalized = struct.normalize()
+            val partitionNamesNormalized = partitionNames.map(_.toLowerCase)
+
+            // Validate that all partitions are in the schema.
+            if 
(partitionNamesNormalized.diff(schemaNormalized.fieldNames).nonEmpty) {
+                throw new IllegalStateException(
+                    s"""At least one partition field is not the Spark 
StructType schema.
+                       |partitions: 
[${partitionNamesNormalized.mkString(",")}]""".stripMargin
+                )
+            }
+
+            val externalClause = if (locationPath.nonEmpty) " EXTERNAL" else ""
+
+            val columnsClause = StructType(schemaNormalized
+                .filterNot(f => partitionNamesNormalized.contains(f.name))
+            ).hiveColumnsDDL()
+
+
+            val partitionClause = {
+                if (partitionNamesNormalized.isEmpty) "-- No partition 
provided"
+                else {
+                    s"""PARTITIONED BY (
+                       |${StructType(partitionNamesNormalized.map(
+                            p => 
schemaNormalized(schemaNormalized.fieldIndex(p))
+                        )).hiveColumnsDDL()}
+                       |)""".stripMargin
+                }
+            }
+
+            val locationClause = if (locationPath.nonEmpty) s"\nLOCATION 
'$locationPath'" else ""
+
+            s"""CREATE$externalClause TABLE `$tableName` (
+               |$columnsClause
+               |)
+               |$partitionClause
+               |STORED AS PARQUET$locationClause""".stripMargin
+        }
+
+
+        /**
+          * Merges otherSchema into this struct StructType, and builds Hive
+          * ALTER DDL statements to add any new fields to or change struct 
definitions
+          * of an existing Hive table.  Each DDL statement returned should be 
executed in order
+          * to alter the target Hive table to match the merged schemas.
+          *
+          * Type changes for non-struct fields are not supported and will 
result in an
+          * IllegalStateException.
+          *
+          * Field names will be lower cased, and all fields are made nullable.
+          *
+          * @param tableName    Hive table name
+          * @param otherSchema  Spark schema
+          *
+          * @return             Iterable of ALTER statement DDL strings
+          */
+        def hiveAlterDDL(
+            tableName: String,
+            otherSchema: StructType
+        ): Iterable[String] = {
+            val schemaNormalized = struct.normalize()
+            val otherSchemaNormalized = otherSchema.normalize()
+
+            // Merge the base schema with otherSchema to ensure there are no 
non struct type changes.
+            // (merge() will throw an exception if it encounters any)
+            val mergedSchemaNormalized = 
schemaNormalized.merge(otherSchemaNormalized)
+
+            // diffSchema contains fields that differ in name or type from the 
original schema.
+            val diffSchema = mergedSchemaNormalized.diff(schemaNormalized)
+
+
+            // If there are no new fields at all, then return empty Seq now.
+            if (diffSchema.isEmpty) {
+                Seq.empty[String]
+            }
+            else {
+                // Group the schema changes into ones for which we can
+                // just ADD COLUMNS in a single statement, and those
+                // for which we need individual CHANGE COLUMN statements.
+                // We must CHANGE COLUMN any StructType field that has changed.
+                val tableModifications = diffSchema.groupBy(f =>
+                    // If this field is in diffSchema and in the original 
schema,
+                    // we know that it must be a StructType.  merge() wouldn't 
let
+                    // us have have fields with the same name and different 
types unless
+                    // it is a StructType.
+                    if (schemaNormalized.fieldNames.contains(f.name)) "change"
+                    else "add"
+                )
+                // To be 100% sure we keep ordering, sort the grouped fields 
by name.
+                .map { case (group, fields) => (group, fields.sortBy(f => 
f.name)) }
+
+                // Generate the ADD COLUMNS statement to add all new COLUMNS
+                val addStatements: Option[String] = if 
(tableModifications.contains("add")) {
+                    Option(s"""ALTER TABLE `$tableName`
+                       |ADD COLUMNS (
+                       
|${StructType(tableModifications("add")).hiveColumnsDDL()}
+                       |)""".stripMargin
+                    )
+                }
+                else
+                    None
+
+                // Generate each CHANGE COLUMNS statement needed to
+                // update the struct<> definition of a struct COLUMN.
+                val changeStatements: Seq[String] = tableModifications
+                    .getOrElse("change", Seq.empty[StructField])
+                    .map { f =>
+                        s"""ALTER TABLE `$tableName`
+                            |CHANGE COLUMN `${f.name}` 
${f.hiveColumnDDL}""".stripMargin
+                    }
+
+                // Return a Seq of all statements to run to update the Hive 
table
+                // to match mergedSchemaNormalized.
+                addStatements ++ changeStatements
+            }
+        }
+    }
+}
+
+
diff --git 
a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestHivePartition.scala
 
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestHivePartition.scala
new file mode 100644
index 0000000..5e52069
--- /dev/null
+++ 
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestHivePartition.scala
@@ -0,0 +1,63 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.collection.immutable.ListMap
+import scala.util.matching.Regex
+
+
+class TestHivePartition extends FlatSpec with Matchers {
+
+    val database = "testdb"
+    val table = "Test-Table"
+    val location = "/path/to/test_table"
+    val partitions: ListMap[String, String] = ListMap(
+        "datacenter" -> "dc1", "year" -> "2017", "month" -> "07"
+    )
+    val partition = HivePartition(
+        database, table, location, partitions
+    )
+
+    it should "normalized fully qualified table name" in {
+        partition.tableName should equal(s"$database.${table.replace("-", 
"_")}")
+    }
+
+    it should "have partition keys" in {
+        partition.keys should equal(partition.keys.toSeq)
+    }
+
+    it should "have correct hive QL representation" in {
+        partition.hiveQL should equal("""datacenter="dc1",year=2017,month=7""")
+    }
+
+
+    it should "have correct hive relative path representation" in {
+        partition.relativePath should 
equal("""datacenter=dc1/year=2017/month=7""")
+    }
+
+    it should "have correct hive full path" in {
+        partition.path should 
equal(s"""$location/datacenter=dc1/year=2017/month=7""")
+    }
+
+    it should "construct with regex and path" in {
+
+        val regex = new Regex(
+            "hourly/(.+)_(.+)/(\\d{4})/(\\d{2})",
+            "datacenter", "table", "year", "month"
+        )
+
+        val baseLocation = "/path/to/external/tables"
+        val p = HivePartition(
+            database, baseLocation, 
s"/path/to/raw/hourly/dc2_Test-Table2/2015/05", regex
+        )
+
+        p.tableName should equal(s"$database.Test_Table2")
+        p.path should 
equal(s"$baseLocation/${p.table}/datacenter=dc2/year=2015/month=5")
+
+        val partitionShouldBe = ListMap(
+            "datacenter" -> "dc2", "year" -> "2015", "month" -> "05"
+        )
+        p.partitions should equal(partitionShouldBe)
+    }
+
+}
diff --git 
a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestSparkSQLHiveExtensions.scala
 
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestSparkSQLHiveExtensions.scala
new file mode 100644
index 0000000..2450127
--- /dev/null
+++ 
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestSparkSQLHiveExtensions.scala
@@ -0,0 +1,381 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.apache.spark.sql.types._
+import org.scalatest.{Matchers, FlatSpec}
+
+import org.wikimedia.analytics.refinery.core.SparkSQLHiveExtensions._
+
+class TestSparkSQLHiveExtensions extends FlatSpec with Matchers {
+
+    val tableName = "test.table"
+    val tableLocation = "/tmp/test/table"
+
+
+    it should "build a Hive DDL string representing a simple column" in {
+        val field = StructField("f1", LongType, nullable = false)
+        val expected = s"`f1` bigint NOT NULL"
+
+        field.hiveColumnDDL should equal(expected)
+    }
+
+    it should "build Hive DDL string representing a complex column" in {
+        val field = StructField("S1", StructType(Seq(
+            StructField("s1", StringType, nullable=true)
+        )))
+        val expected = s"`S1` struct<`s1`:string>"
+
+        field.hiveColumnDDL should equal(expected)
+    }
+
+
+    it should "build a Hive DDL string representing multiple columns" in {
+        val fields = Seq(
+            StructField("f1", StringType, nullable = false),
+            StructField("f2", IntegerType, nullable = true),
+            StructField("S1", StructType(Seq(
+                StructField("s1", StringType, nullable=true)
+            )))
+        )
+        val expected =
+            s"""`f1` string NOT NULL,
+               |`f2` int,
+               |`S1` struct<`s1`:string>""".stripMargin
+
+        StructType(fields).hiveColumnsDDL() should equal(expected)
+    }
+
+    it should "merge and normalize 2 simple schemas" in {
+        val schema1 = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StringType, nullable = true)
+        ))
+
+        val schema2 = StructType(Seq(
+            StructField("f4", LongType, nullable = true),
+            StructField("f2", StringType, nullable = true),
+            StructField("f3", FloatType, nullable = true)
+
+        ))
+
+        // Unioned schema will be first ordred by schema1 fields, with any 
extra fields
+        // from schema2 appended in the order they are found there.
+        val expected = StructType(Seq(
+            StructField("f1", LongType, nullable = true),
+            StructField("f2", StringType, nullable = true),
+            StructField("f4", LongType, nullable = true),
+            StructField("f3", DoubleType, nullable = true)
+        ))
+
+        schema1.merge(schema2) should equal(expected)
+    }
+
+
+    it should "merge and not normalize 2 simple schemas" in {
+        val schema1 = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StringType, nullable = true)
+        ))
+
+        val schema2 = StructType(Seq(
+            StructField("f4", LongType, nullable = true),
+            StructField("f2", StringType, nullable = true),
+            StructField("f3", IntegerType, nullable = true)
+
+        ))
+
+        // Unioned schema will be first ordred by schema1 fields, with any 
extra fields
+        // from schema2 appended in the order they are found there.
+        val expected = StructType(Seq(
+            StructField("F1", LongType, nullable = true),
+            StructField("f2", StringType, nullable = true),
+            StructField("f4", LongType, nullable = true),
+            StructField("f3", LongType, nullable = true)
+        ))
+
+        schema1.merge(schema2, normalize=false) should equal(expected)
+    }
+
+    it should "merge and not normalize 2 complex schemas" in {
+        val schema1 = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s2", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true)
+                )))
+            )), nullable = true),
+            StructField("f3", StringType, nullable = true)
+        ))
+
+        val schema2 = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s3", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true),
+                    StructField("c2", StringType, nullable=true)
+                )))
+            )), nullable = true),
+            StructField("F5", StructType(Seq(
+                StructField("b1", IntegerType, nullable=true))
+            )),
+            StructField("f4", LongType, nullable = true)
+        ))
+
+        // Merged schema will be first ordered by schema1 fields, with any 
extra fields
+        // from schema2 appended in the order they are found there.
+        val expected = StructType(Seq(
+            StructField("F1", LongType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s2", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true),
+                    StructField("c2", StringType, nullable=true)
+                ))),
+                StructField("s3", StringType, nullable = true)
+            )), nullable = true),
+            StructField("f3", StringType, nullable = true),
+            StructField("F5", StructType(Seq(
+                StructField("b1", LongType, nullable=true))
+            )),
+            StructField("f4", LongType, nullable = true)
+        ))
+
+        schema1.merge(schema2, normalize=false) should equal(expected)
+    }
+
+
+    it should "merge and normalize 2 complex schemas" in {
+        val schema1 = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s2", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true)
+                )))
+            )), nullable = true),
+            StructField("f3", StringType, nullable = true)
+        ))
+
+        val schema2 = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s3", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true),
+                    StructField("c2", StringType, nullable=true)
+                )))
+            )), nullable = true),
+            StructField("F5", StructType(Seq(
+                StructField("b1", IntegerType, nullable=true))
+            )),
+            StructField("f4", LongType, nullable = true)
+        ))
+
+        // Merged schema will be first ordred by schema1 fields, with any 
extra fields
+        // from schema2 appended in the order they are found there.
+        val expected = StructType(Seq(
+            StructField("f1", LongType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s2", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true),
+                    StructField("c2", StringType, nullable=true)
+                ))),
+                StructField("s3", StringType, nullable = true)
+            )), nullable = true),
+            StructField("f3", StringType, nullable = true),
+            StructField("f5", StructType(Seq(
+                StructField("b1", LongType, nullable=true))
+            )),
+            StructField("f4", LongType, nullable = true)
+        ))
+
+        schema1.merge(schema2, normalize=true) should equal(expected)
+    }
+
+
+    it should "build non external no partitions create DDL with single schema" 
in {
+        val schema = StructType(Seq(
+            StructField("f2", LongType, nullable = true),
+            StructField("f3", StringType, nullable = true),
+            StructField("F1", IntegerType, nullable = true)
+        ))
+
+        val expected =
+            s"""CREATE TABLE `$tableName` (
+               |`f2` bigint,
+               |`f3` string,
+               |`f1` bigint
+               |)
+               |-- No partition provided
+               |STORED AS PARQUET""".stripMargin
+
+        val statement = schema.hiveCreateDDL(tableName)
+
+        statement should equal(expected)
+    }
+
+    it should "build external no partitions create DDL with single schema" in {
+        val schema = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", LongType, nullable = true),
+            StructField("f3", StringType, nullable = true)
+        ))
+
+        val expected =
+            s"""CREATE EXTERNAL TABLE `$tableName` (
+               |`f1` bigint,
+               |`f2` bigint,
+               |`f3` string
+               |)
+               |-- No partition provided
+               |STORED AS PARQUET
+               |LOCATION '$tableLocation'""".stripMargin
+
+        val statement = schema.hiveCreateDDL(tableName, tableLocation)
+
+        statement should equal(expected)
+    }
+
+    it should "build external create DDL with partitions and single schema" in 
{
+        val schema = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", LongType, nullable = true),
+            StructField("f4", StringType, nullable = true),
+            StructField("f3", StringType, nullable = true)
+        ))
+        val partitions = Seq("f3", "f4")
+
+        val expected =
+            s"""CREATE EXTERNAL TABLE `$tableName` (
+               |`f1` bigint,
+               |`f2` bigint
+               |)
+               |PARTITIONED BY (
+               |`f3` string,
+               |`f4` string
+               |)
+               |STORED AS PARQUET
+               |LOCATION '$tableLocation'""".stripMargin
+
+        val statement = schema.hiveCreateDDL(tableName, tableLocation, 
partitions)
+
+        statement should equal(expected)
+    }
+
+
+    it should "build alter DDL with single schema" in {
+        val schema = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", LongType, nullable = true),
+            StructField("f3", StringType, nullable = true)
+        ))
+
+        val otherSchema = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f3", StringType, nullable = true),
+            StructField("f4", LongType, nullable = true)
+        ))
+
+        val partitions = Seq("f1", "f2")
+
+        val expected = Seq(
+            s"""ALTER TABLE `$tableName`
+               |ADD COLUMNS (
+               |`f4` bigint
+               |)""".stripMargin
+        )
+
+        val statements = schema.hiveAlterDDL(tableName, otherSchema)
+
+        statements should equal(expected)
+    }
+
+
+    it should "build alter DDL with just modified merged structs" in {
+        val schema = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s2", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true)
+                )))
+            )), nullable = true),
+            StructField("f3", StringType, nullable = true)
+        ))
+
+        val otherSchema = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s3", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true),
+                    StructField("C2", StringType, nullable=true)
+                )))
+            )), nullable = true)
+        ))
+
+        val expected = Seq(
+            s"""ALTER TABLE `$tableName`
+               |CHANGE COLUMN `f2` `f2` 
struct<`S1`:string,`s2`:string,`A1`:struct<`c1`:string,`C2`:string>,`s3`:string>""".stripMargin
+        )
+
+        val statements = schema.hiveAlterDDL(tableName, otherSchema)
+
+        statements should equal(expected)
+    }
+
+    it should "build alter DDL with new columns and modified merged structs" 
in {
+        val schema = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s2", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true)
+                )))
+            )), nullable = true),
+            StructField("f3", StringType, nullable = true)
+        ))
+
+        val otherSchema = StructType(Seq(
+            StructField("F1", IntegerType, nullable = true),
+            StructField("f2", StructType(Seq(
+                StructField("S1", StringType, nullable = true),
+                StructField("s3", StringType, nullable = true),
+                StructField("A1", StructType(Seq(
+                    StructField("c1", StringType, nullable=true),
+                    StructField("c2", StringType, nullable=true)
+                )))
+            )), nullable = true),
+            StructField("f4", StructType(Seq(
+                StructField("b1", IntegerType, nullable=true))
+            )),
+            StructField("f5", LongType, nullable = true)
+        ))
+
+        val expected = Seq(
+            s"""ALTER TABLE `$tableName`
+               |ADD COLUMNS (
+               |`f4` struct<`b1`:bigint>,
+               |`f5` bigint
+               |)""".stripMargin,
+            s"""ALTER TABLE `$tableName`
+               |CHANGE COLUMN `f2` `f2` 
struct<`S1`:string,`s2`:string,`A1`:struct<`c1`:string,`c2`:string>,`s3`:string>""".stripMargin
+        )
+
+        val statements = schema.hiveAlterDDL(tableName, otherSchema)
+
+        statements should equal(expected)
+    }
+
+}
diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index 248efba..2f0cf22 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -162,7 +162,6 @@
             <version>1.6.0_0.4.7</version>
             <scope>test</scope>
         </dependency>
-
     </dependencies>
 
     <build>
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
new file mode 100644
index 0000000..f9abc66
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
@@ -0,0 +1,895 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.log4j.LogManager
+import scopt.OptionParser
+
+import scala.util.{Failure, Success, Try}
+import scala.util.matching.Regex
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
+
+import org.joda.time.Hours
+import org.joda.time.format.DateTimeFormatter
+import com.github.nscala_time.time.Imports._
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.hive.HiveContext
+
+import org.wikimedia.analytics.refinery.core.{HivePartition, SparkJsonToHive, 
Utilities}
+
+
+// TODO: ERROR Hive: Table otto not found: default.otto table not found ???
+// TODO: support append vs overwrite?
+// TODO: Hive Table Locking?
+
+/**
+  * Looks for hourly input partition directories with JSON data that need 
refinement,
+  * and refines them into Hive Parquet tables using SparkJsonToHive.
+  */
+object JsonRefine {
+
+    private val log = LogManager.getLogger("JsonRefine")
+    private val iso8601DateFormatter = 
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
+    /**
+      * Config class for CLI argument parser using scopt
+      */
+    case class Params(
+        inputBasePath: String                      = "",
+        outputBasePath: String                     = "",
+        databaseName: String                       = "default",
+        sinceDateTime: DateTime                    = DateTime.now - 192.hours, 
// 8 days ago
+        untilDateTime: DateTime                    = DateTime.now,
+        inputPathPattern: String                   = 
".*/(.+)/hourly/(\\d{4})/(\\d{2})/(\\d{2})/(\\d{2}).*",
+        inputPathPatternCaptureGroups: Seq[String] = Seq("table", "year", 
"month", "day", "hour"),
+        inputPathDateTimeFormat: DateTimeFormatter = 
DateTimeFormat.forPattern("'hourly'/yyyy/MM/dd/HH"),
+        tableWhitelistRegex: Option[Regex]         = None,
+        tableBlacklistRegex: Option[Regex]         = None,
+        doneFlag: String                           = "_REFINED",
+        parallelism: Option[Int]                   = None,
+        compressionCodec: String                   = "snappy",
+        isSequenceFile: Boolean                    = true,
+        limit: Option[Int]                         = None,
+        dryRun: Boolean                            = false,
+        shouldEmailReport: Boolean                 = false,
+        smtpURI: String                            = "mx1001.wikimedia.org:25",
+        fromEmail: String                          = 
s"jsonrefine@${java.net.InetAddress.getLocalHost.getCanonicalHostName}",
+        toEmails: Seq[String]                      = 
Seq("[email protected]")
+    )
+
+
+    // Support implicit Regex conversion from CLI opt.
+    implicit val scoptRegexRead: scopt.Read[Regex] =
+        scopt.Read.reads { _.r }
+
+    // Support implicit Option[Regex] conversion from CLI opt.
+    implicit val scoptOptionRegexRead: scopt.Read[Option[Regex]] =
+        scopt.Read.reads {s => Some(s.r) }
+
+    // Support implicit DateTimeFormatter conversion from CLI opt.
+    implicit val scoptDateTimeFormatterRead: scopt.Read[DateTimeFormatter] =
+        scopt.Read.reads { s => DateTimeFormat.forPattern(s) }
+
+    // Support implicit DateTime conversion from CLI opt.
+    // The opt can either be given in integer hours ago, or
+    // as an ISO-8601 date time.
+    implicit val scoptDateTimeRead: scopt.Read[DateTime] =
+        scopt.Read.reads { s => {
+            if (s.forall(Character.isDigit))
+                DateTime.now - s.toInt.hours
+            else
+                DateTime.parse(s, iso8601DateFormatter)
+        }
+    }
+
+
+
+    /**
+      * Define the command line options parser.
+      */
+    val argsParser = new OptionParser[Params](
+        "spark-submit --class org.wikimedia.analytics.refinery.job.JsonRefine 
refinery-job.jar"
+    ) {
+        head("""
+              |JSON Datasets -> Partitioned Hive Parquet tables.
+              |
+              |Given an input base path, this will search all subdirectories 
for input
+              |partitions to convert to Parquet backed Hive tables.  This was 
originally
+              |written to work with JSON data imported via Camus into hourly 
buckets, but
+              |should be configurable to work with any regular import 
directory hierarchy.
+              |
+              |Example:
+              |  spark-submit --class 
org.wikimedia.analytics.refinery.job.JsonRefine refinery-job.jar \
+              |   --input-base-path     /wmf/data/raw/event \
+              |   --output-base-path    /user/otto/external/eventbus5' \
+              |   --database            event \
+              |   --since               24 \
+              |   --input-regex         
'.*(eqiad|codfw)_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' \
+              |   --input-capture       'datacenter,table,year,month,day,hour' 
\
+              |   --table-blacklist     '.*page_properties_change.*'
+              |
+              |""".stripMargin, "")
+
+        note("""NOTE: You may pass all of the described CLI options to this 
job in a single
+               |string with --options '<options>' flag.\n""".stripMargin)
+
+        help("help") text "Prints this usage text."
+
+        opt[String]('i', 
"input-base-path").required().valueName("<path>").action { (x, p) =>
+            p.copy(inputBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+        } text
+            """Path to input JSON datasets.  This directory is expected to 
contain
+              |directories of individual (topic) table datasets.  E.g.
+              |/path/to/raw/data/{myprefix_dataSetOne,myprefix_dataSetTwo}, 
etc.
+              |Each of these subdirectories will be searched for partitions 
that
+              |need to be refined.""".stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[String]('o', "output-base-path") optional() valueName "<path>" 
action { (x, p) =>
+            p.copy(outputBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+        } text
+            """Base path of output data and of external Hive tables.  Each 
table will be created
+              |with a LOCATION in a subdirectory of this path."""
+                .stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[String]('d', "database") optional() valueName "<database>" action 
{ (x, p) =>
+            p.copy(databaseName = if (x.endsWith("/")) x.dropRight(1) else x)
+        } text "Hive database name in which to manage refined Hive tables.\n"
+
+        opt[DateTime]('s', "since") optional() valueName "<since-date-time>" 
action { (x, p) =>
+            p.copy(sinceDateTime = x)
+        } text
+            """Refine all data found since this date time.  This may either be 
given as an integer
+              |number of hours ago, or an ISO-8601 formatted date time.  
Default: 192 hours ago."""
+                .stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[DateTime]('u', "until") optional() valueName "<until-date-time>" 
action { (x, p) =>
+            p.copy(untilDateTime = x)
+        } text
+            """Refine all data found until this date time.  This may either be 
given as an integer
+              |number of hours ago, or an ISO-8601 formatted date time.  
Default: now."""
+                .stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[String]('R', "input-regex") optional() valueName "<regex>" action 
{ (x, p) =>
+            p.copy(inputPathPattern = x)
+        } text
+            """input-regex should match the input partition directory 
hierarchy starting from the
+              |dataset base path, and should capture the table name and the 
partition values.
+              |Along with input-capture, this allows arbitrary extraction of 
table names and and
+              |partitions from the input path.  You are required to capture at 
least "table"
+              |using this regex.  The default will match an hourly bucketed 
Camus import hierarchy,
+              |using the topic name as the table 
name.""".stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[String]('C', "input-capture") optional() valueName 
"<capture-list>" action { (x, p) =>
+            p.copy(inputPathPatternCaptureGroups = x.split(","))
+        } text
+            """input-capture should be a comma separated list of named capture 
groups
+              |corresponding to the groups captured byt input-regex.  These 
need to be
+              |provided in the order that the groups are captured.  This 
ordering will
+              |also be used for partitioning.""".stripMargin.replace("\n", 
"\n\t") + "\n"
+
+        opt[DateTimeFormatter]('F', "input-datetime-format") optional() 
valueName "<format>" action { (x, p) =>
+            p.copy(inputPathDateTimeFormat = x)
+        } text
+            """This DateTimeFormat will be used to generate all possible 
partitions since
+              |the given lookback-hours in each dataset directory.  This 
format will be used
+              |to format a DateTime to input directory partition paths.  The 
finest granularity
+              |supported is hourly.  Every hour in the past lookback-hours 
will be generated,
+              |but if you specify a less granular format (e.g. daily, like 
"daily"/yyyy/MM/dd),
+              |the code will reduce the generated partition search for that 
day to 1, instead of 24.
+              |The default is suitable for generating partitions in an hourly 
bucketed Camus
+              |import hierarchy.
+            """.stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[(Option[Regex])]('w', "table-whitelist") optional() valueName 
"<regex>" action { (x, p) =>
+            p.copy(tableWhitelistRegex = x)
+        } text "Whitelist regex of table names to refine.\n".stripMargin
+
+        opt[Option[Regex]]('b', "table-blacklist") optional() valueName 
"<regex>" action { (x, p) =>
+            p.copy(tableBlacklistRegex = x)
+        } text "Blacklist regex of table names to skip.\n".stripMargin
+
+        opt[String]('D', "done-flag") optional() valueName "<filename>" action 
{ (x, p) =>
+            p.copy(doneFlag = x)
+        } text
+            """When a partition is successfully refined, this file will be 
created in the
+              |output partition path with the binary timestamp of the input 
source partition's
+              |modification timestamp.  This allows subsequent runs that to 
detect if the input
+              |data has changed meaning the partition needs to be re-refined.
+              |Default: _REFINED""".stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[Int]('P', "parallelism") optional() valueName "<parallelism>" 
action { (x, p) =>
+            p.copy(parallelism = Some(x))
+        } text
+            """Refine into up to this many tables in parallel.  Individual 
partitions
+              |destined for the same Hive table will be refined serially.
+              |Defaults to the number of local CPUs (i.e. what Scala parallel
+              |collections uses).""".stripMargin.replace("\n", "\n\t") + "\n"
+
+
+        opt[String]('c', "compression-codec") optional() valueName "<codec>" 
action { (x, p) =>
+            p.copy(compressionCodec = x)
+        } text "Value of spark.sql.parquet.compression.codec, default: 
snappy\n"
+
+        opt[Boolean]('S', "sequence-file") optional() action { (x, p) =>
+            p.copy(isSequenceFile = x)
+        } text
+            """Set to true if the input data is stored in Hadoop Sequence 
files.
+              |Otherwise text is assumed.  Default: true"""
+                .stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[String]('L', "limit") optional() valueName "<limit>" action { (x, 
p) =>
+            p.copy(limit = Some(x.toInt))
+        } text
+            """Only refine this many partitions directories.  This is useful 
while
+              |testing to reduce the number of refinements to do at once.  
Defaults
+              |to no limit.""".stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[Unit]('n', "dry-run") optional() action { (_, p) =>
+            p.copy(dryRun = true)
+        } text
+            """Set to true if no action should actually be taken.  Instead, 
targets
+              |to refine will be printed, but they will not be refined.
+              |Default: false"""
+                .stripMargin.replace("\n", "\n\t") + "\n"
+
+        opt[Unit]('E', "send-email-report") optional() action { (_, p) =>
+            p.copy(shouldEmailReport = true)
+        } text
+            "Set this flag if you want an email report of any failures during 
refinement."
+
+        opt[String]('T', "smtp-uri") optional() valueName "<smtp-uri>" action 
{ (x, p) =>
+            p.copy(smtpURI = x)
+        } text "SMTP server host:port. Default: mx1001.wikimedia.org"
+
+        opt[String]('f', "from-email") optional() valueName "<from-email>" 
action { (x, p) =>
+            p.copy(fromEmail = x)
+        } text "Email report from sender email address."
+
+        opt[String]('t', "to-emails") optional() valueName "<to-emails>" 
action { (x, p) =>
+            p.copy(toEmails = x.split(","))
+        } text
+            "Email report recipient email addresses (comma separated). 
Default: [email protected]"
+
+    }
+
+    def main(args: Array[String]): Unit = {
+        val params = args.headOption match {
+            // Case when our job options are given as a single string.  Split 
them
+            // and pass them to argsParser.
+            case Some("--options") =>
+                argsParser.parse(args(1).split("\\s+"), 
Params()).getOrElse(sys.exit(1))
+            // Else the normal usage, each CLI opts can be parsed as a job 
option.
+            case _ =>
+                argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+        }
+
+        // Exit non-zero if if any refinements failed.
+        if (apply(params))
+            sys.exit(0)
+        else
+            sys.exit(1)
+    }
+
+
+    /**
+      * Given params, refine all discovered JsonTargets.
+      *
+      * @param params Params
+      * @return true if all targets needing refinement succeeded, false 
otherwise.
+      */
+    def apply(params: Params): Boolean = {
+        // Initial setup - Spark, HiveContext, Hadoop FileSystem
+        val conf = new SparkConf()
+        val sc = new SparkContext(conf)
+
+        val fs = FileSystem.get(sc.hadoopConfiguration)
+        val hiveContext = new HiveContext(sc)
+        hiveContext.setConf("spark.sql.parquet.compression.codec", 
params.compressionCodec)
+
+        // Ensure that inputPathPatternCaptureGroups contains "table", as this 
is needed
+        // to determine the Hive table name we will refine into.
+        if (!params.inputPathPatternCaptureGroups.contains("table")) {
+            throw new RuntimeException(
+                s"Invalid <input-capture> 
${params.inputPathPatternCaptureGroups}. " +
+                s"Must at least contain 'table' as a named capture group."
+            )
+        }
+
+        // Combine the inputPathPattern with the capture groups to build a 
regex that
+        // will use aliases for the named groups.  This will be used to extract
+        // table and partitions out of the inputPath.
+        val inputPathRegex = new Regex(
+            params.inputPathPattern,
+            params.inputPathPatternCaptureGroups: _*
+        )
+
+
+        log.info(
+            s"Looking for JSON targets to refine in ${params.inputBasePath} 
between " +
+            s"${params.sinceDateTime} and ${params.untilDateTime}"
+        )
+
+        // Need JsonTargets for every existent input partition since 
pastCutoffDateTime
+        val targetsToRefine = jsonTargetsSince(
+            fs,
+            new Path(params.inputBasePath),
+            params.isSequenceFile,
+            new Path(params.outputBasePath),
+            params.databaseName,
+            params.doneFlag,
+            params.inputPathDateTimeFormat,
+            inputPathRegex,
+            params.sinceDateTime
+        )
+        // Filter for tables in whitelist, filter out tables in blacklist,
+        // and filter the remaining for targets that need refinement.
+        .filter(target => shouldRefineJsonTarget(
+            target, params.tableWhitelistRegex, params.tableBlacklistRegex
+        ))
+
+        // At this point, targetsToRefine will be a Seq of JsonTargets in our 
targeted
+        // time range that need refinement, either because they haven't yet 
been refined,
+        // or the input data has changed since the previous refinement.
+
+        // Return now if we didn't find any targets to refine.
+        if (targetsToRefine.isEmpty) {
+            log.info(s"No targets needing refinement were found in 
${params.inputBasePath}")
+            return true
+        }
+
+        // Locally parallelize the targets.
+        // If params.limit, then take only the first limit input targets.
+        // This is mainly only useful for testing.
+        val targets = params.limit match {
+            case Some(_) => targetsToRefine.take(params.limit.get).par
+            case None    => targetsToRefine.par
+        }
+
+        // If custom parallelism was specified, create a new ForkJoinPool for 
this
+        // parallel collection with the provided parallelism level.
+        if (params.parallelism.isDefined) {
+            targets.tasksupport = new ForkJoinTaskSupport(
+                new ForkJoinPool(params.parallelism.get)
+            )
+        }
+
+        val targetsByTable = targets.groupBy(_.tableName)
+
+        log.info(
+            s"Refining ${targets.length} JSON dataset partitions in into 
tables " +
+            s"${targetsByTable.keys.mkString(", ")} with local " +
+            s"parallelism of ${targets.tasksupport.parallelismLevel}..."
+        )
+
+        if (params.dryRun)
+            log.warn("NOTE: --dry-run was specified, nothing will actually be 
refined!")
+
+        // Loop over the inputs in parallel and refine them to
+        // their Hive table partitions.  jobStatuses should be a
+        // iterable of Trys as Success/Failures.
+        val jobStatusesByTable = targetsByTable.map { case (table, 
tableTargets) => {
+            // We need tableTargets to run in serial instead of parallel.  
When a table does
+            // not yet exist, we want the first target here to issue a CREATE, 
while the
+            // next one to use the created table, or ALTER it if necessary.  
We don't
+            // want multiple CREATEs for the same table to happen in parallel.
+            if (!params.dryRun)
+                table -> refineJsonTargets(hiveContext, tableTargets.seq)
+            // If --dry-run was given, don't refine, just map to Successes.
+            else
+                table -> tableTargets.seq.map(Success(_))
+        }}
+
+        // Log successes and failures.
+        val successesByTable = jobStatusesByTable.map(t => t._1 -> 
t._2.filter(_.isSuccess))
+        val failuresByTable = jobStatusesByTable.map(t => t._1 -> 
t._2.filter(_.isFailure))
+
+        var hasFailures = false
+        if (successesByTable.nonEmpty) {
+            for ((table, successes) <- successesByTable.filter(_._2.nonEmpty)) 
{
+                val totalRefinedRecordCount = 
targetsByTable(table).map(_.recordCount).sum
+                log.info(
+                    s"Successfully refined ${successes.length} of 
${targetsByTable(table).size} " +
+                    s"raw JSON dataset partitions into table $table (total # 
refined records: $totalRefinedRecordCount)"
+                )
+            }
+        }
+
+        // Collect a string of failures that we might email as a report later.
+        var failureMessages = ""
+        if (failuresByTable.nonEmpty) {
+
+            for ((table, failures) <- failuresByTable.filter(_._2.nonEmpty)) {
+                // Log each failed refinement.
+                val message =
+                    s"The following ${failures.length} of 
${targetsByTable(table).size} " +
+                    s"raw JSON dataset partitions for table $table failed 
refinement:\n\t" +
+                    failures.mkString("\n\t")
+
+                log.error(message)
+                failureMessages += "\n\n" + message
+
+                hasFailures = true
+            }
+        }
+
+        // If we should send this as a failure email report
+        // (and this is not a dry run), do it!
+        if (hasFailures && params.shouldEmailReport && !params.dryRun) {
+            val smtpHost = params.smtpURI.split(":")(0)
+            val smtpPort = params.smtpURI.split(":")(1)
+
+            log.info(s"Sending failure email report to 
${params.toEmails.mkString(",")}")
+            Utilities.sendEmail(
+                smtpHost,
+                smtpPort,
+                params.fromEmail,
+                params.toEmails.toArray,
+                s"JsonRefine failure report for ${params.inputBasePath} -> 
${params.outputBasePath}",
+                failureMessages
+            )
+        }
+
+        // Return true if no failures, false otherwise.
+        !hasFailures
+    }
+
+
+    /**
+      * Given a JsonTarget, and option whitelist regex and blacklist regex,
+      * this returns true if the JsonTarget should be refined, based on regex 
matching and
+      * on output existence and doneFlag content.
+      *
+      * @param target JsonTarget
+      * @param tableWhitelistRegex Option[Regex]
+      * @param tableBlacklistRegex Option[Regex]
+      * @return
+      */
+    def shouldRefineJsonTarget(
+        target: JsonTarget,
+        tableWhitelistRegex: Option[Regex],
+        tableBlacklistRegex: Option[Regex]
+    ): Boolean = {
+
+        // Filter for targets that will refine to tables that match the 
whitelist
+        if (tableWhitelistRegex.isDefined &&
+            !regexMatches(target.partition.table, tableWhitelistRegex.get)
+        ) {
+            log.info(
+                s"$target table ${target.partition.table} does not match table 
whitelist regex " +
+                s"${tableWhitelistRegex.get}', skipping."
+            )
+            false
+        }
+        // Filter out targets that will refine to tables that match the 
blacklist
+        else if (tableBlacklistRegex.isDefined &&
+            regexMatches(target.partition.table, tableBlacklistRegex.get)
+        ) {
+            log.info(
+                s"$target table ${target.partition.table} matches table 
blacklist regex " +
+                s"'${tableBlacklistRegex.get}', skipping."
+            )
+            false
+        }
+        // Finally filter for those that need to be refined (have new data).
+        else if (!target.shouldRefine) {
+            log.info(
+                s"$target does not have new data since the last refine at " +
+                s"${target.doneFlagMTime().getOrElse("_unknown_")}, skipping."
+            )
+            false
+        }
+        else {
+            true
+        }
+    }
+
+
+    /**
+      * Given a Seq of JsonTargets, this runs SparkJsonToHive on each one.
+      *
+      * @param hiveContext HiveContext
+      * @param targets     Seq of JsonTargets to refine
+      * @return
+      */
+    def refineJsonTargets(
+        hiveContext: HiveContext,
+        targets: Seq[JsonTarget]
+    ): Seq[Try[JsonTarget]] = {
+        targets.map(target => {
+            log.info(s"Beginning refinement of $target...")
+
+            try {
+                val recordCount = SparkJsonToHive(
+                    hiveContext,
+                    target.inputPath.toString,
+                    target.partition,
+                    target.inputIsSequenceFile,
+                    () => target.writeDoneFlag()
+                )
+
+                log.info(
+                    s"Finished refinement of JSON dataset $target. " +
+                    s"(# refined records: $recordCount)"
+                )
+
+                target.success(recordCount)
+
+            }
+            catch {
+                case e: Exception => {
+                    log.error(s"Failed refinement of JSON dataset $target.", e)
+                    target.failure(e)
+                }
+            }
+        })
+    }
+
+
+    /**
+      * Finds JsonTargets with existent input partition paths between 
sinceDateTime and untilDateTime.
+      * The table and partitions are extracted from inputPath by combining 
inputPathDateTimeFormatter
+      * and inputPathRegex.
+      *
+      * inputPathDateTimeFormatter will be used to construct the expected 
inputPath for each
+      * input partition directory between sinceDateTime and untilDateTime.  
E.g. a formatter
+      * with a format of "'hourly'/yyyy/MM/dd/HH" will look for existent 
inputPaths
+      * for every hour in the provided time period, like
+      *     $baseInputPath/subdir1/hourly/2017/07/26/00,
+      *     $baseInputPath/subdir1/hourly/2017/07/26/01,
+      *     $baseInputPath/subdir2/hourly/2017/07/26/00,
+      *     $baseInputPath/subdir2/hourly/2017/07/26/01,
+      * etc.
+      *
+      * inputPathRegex is expected to capture named groups that include 
"table" and any other
+      * partition keys.  inputPathRegex's capture groups must contain one 
named "table".
+      * E.g. new Regex(
+      *     "(eqiad|codfw)_(.+)/hourly/\\d{4}/\\d{2}/\\d{2}/\\d{2}",
+      *     "datacenter", "table", "year", "month", "day", "hour"
+      *
+      * and an inputPath of
+      *     $baseInputPath/eqiad_mediawiki_revision-create/2017/07/26/01
+      *
+      * Will construct a JsonTarget with table "mediawiki_revision_create" 
(hyphens are converted
+      * to underscores) and partitions 
datacenter="eqiad",year=2017,month=07,day=26,hour=01
+      *
+      *
+      * @param fs                           Hadoop FileSystem
+      *
+      * @param baseInputPath                Path to base input datasets.  Each 
subdirectory
+      *                                     is assumed to be a unique dataset 
with individual
+      *                                     partitions.  Every subdirectory's 
partition
+      *                                     paths here must be compatible with 
the provided
+      *                                     values of 
inputPathDateTimeFormatter and inputPathRegex.
+      *
+      * @param inputIsSequenceFile          Should be True if the input data 
files are Hadoop
+      *                                     Sequence Files.
+      *
+      * @param baseTableLocationPath        Path to directory where Hive table 
data will be stored.
+      *                                     $baseTableLocationPath/$table will 
be the value of the
+      *                                     external Hive table's LOCATION.
+      *
+      * @param databaseName                 Hive database name
+      *
+      * @param doneFlag                     Done flag file.  A successful 
refinement will
+      *                                     write this file to the output path 
with
+      *                                     the Long timestamp of the 
inputPath's current mod time.
+      *
+      * @param inputPathDateTimeFormatter   Formatter used to construct input 
partition paths
+      *                                     in the given time range.
+      *
+      * @param inputPathRegex               Regex used to extract table name 
and partition
+      *                                     information.
+      *
+      * @param sinceDateTime                Start date time to look for input 
partitions.
+      *
+      * @param untilDateTime                End date time to look for input 
partitions.
+      *                                     Defaults to DateTime.now
+      * @return
+      */
+    def jsonTargetsSince(
+        fs: FileSystem,
+        baseInputPath: Path,
+        inputIsSequenceFile: Boolean,
+        baseTableLocationPath: Path,
+        databaseName: String,
+        doneFlag: String,
+        inputPathDateTimeFormatter: DateTimeFormatter,
+        inputPathRegex: Regex,
+        sinceDateTime: DateTime,
+        untilDateTime: DateTime = DateTime.now
+    ): Seq[JsonTarget] = {
+        val inputDatasetPaths = subdirectoryPaths(fs, baseInputPath)
+
+        // Map all partitions in each inputPaths since pastCutoffDateTime to 
JsonTargets
+        inputDatasetPaths.flatMap { inputDatasetPath =>
+            // Get all possible input partition paths for all directories in 
inputDatasetPath
+            // between sinceDateTime and untilDateTime.
+            // This will include all possible partition paths in that time 
range, even if that path
+            // does not actually exist.
+            val pastPartitionPaths = partitionPathsSince(
+                inputDatasetPath.toString,
+                inputPathDateTimeFormatter,
+                sinceDateTime,
+                untilDateTime
+            )
+
+            // Convert each possible partition input path into a possible 
JsonTarget for refinement.
+            pastPartitionPaths.map(partitionPath => {
+                // Any capturedKeys other than table are expected to be 
partition key=values.
+                val partition = HivePartition(
+                    databaseName,
+                    baseTableLocationPath.toString,
+                    partitionPath.toString,
+                    inputPathRegex
+                )
+
+                JsonTarget(
+                    fs,
+                    partitionPath,
+                    inputIsSequenceFile,
+                    partition,
+                    doneFlag
+                )
+            })
+            // We only care about input partition paths that actually exist,
+            // so filter out those that don't.
+            .filter(_.inputExists())
+         }
+    }
+
+
+    /**
+      * Returns true of s matches r, else false.
+      * @param s    String to match
+      * @param r    Regex
+      * @return
+      */
+    def regexMatches(s: String, r: Regex): Boolean = {
+        s match {
+            case r(_*) => true
+            case _     => false
+        }
+    }
+
+
+    /**
+      * Retruns a Seq of all directory Paths in a directory.
+      * @param fs           Hadoop FileSystem
+      * @param inDirectory  directory Path in which to look for subdirectories
+      * @return
+      */
+    def subdirectoryPaths(fs: FileSystem, inDirectory: Path): Seq[Path] = {
+        fs.listStatus(inDirectory).filter(_.isDirectory).map(_.getPath)
+    }
+
+
+    /**
+      * Given 2 DateTimes, this generates a Seq of DateTimes representing all 
hours
+      * between since d1 (inclusive) and d2 (exclusive).  E.g.
+      *     DateTime.now -> 2017-08-10T21:42:32.820Z
+      *
+      *     hoursInBetween(DateTime.now - 2.hours, DateTime.now) ->
+      *         Seq(2017-08-10T19:00:00.000Z, 2017-08-10T20:00:00.000Z)
+      *
+      * In the above example, the current hour is 21, and this function returns
+      * the previous two hours.
+      *
+      * @param d1   sinceDateTime
+      * @param d2   untilDateTime
+      * @return
+      */
+    def hoursInBetween(d1: DateTime, d2: DateTime): Seq[DateTime] = {
+        val oldestHour = new DateTime(d1, 
DateTimeZone.UTC).hourOfDay.roundCeilingCopy
+        val youngestHour = new DateTime(d2, 
DateTimeZone.UTC).hourOfDay.roundFloorCopy
+
+        for (h <- 0 to Hours.hoursBetween(oldestHour, youngestHour).getHours) 
yield {
+            oldestHour + h.hours - 1.hours
+        }
+    }
+
+
+    /**
+      * Given a DateTimeFormatter and 2 DateTimes, this will generate
+      * a Seq of Paths for every distinct result of fmt.print(hour) prefixed
+      * witih pathPrefix.  If your date formatter generates the same
+      * path for multiple hours, only one of those paths will be included
+      * in the result.  This way, you can still generate a list more granular 
partitions, if
+      * your data happens to be partitioned at a more granular time bucketing 
than hourly.
+      *
+      * @param pathPrefix   Prefix to prepend to every generated partition path
+      * @param fmt          Date formatter to use to extract partition paths 
from hours
+      *                     between d1 and d2
+      * @param d1           sinceDateTime
+      * @param d2           untilDateTime,  Defaults to DateTime.now
+      * @return
+      */
+    def partitionPathsSince(
+        pathPrefix: String,
+        fmt: DateTimeFormatter,
+        d1: DateTime,
+        d2: DateTime = DateTime.now
+    ): Seq[Path] = {
+        hoursInBetween(d1, d2)
+            .map(hour => new Path(pathPrefix + "/" + fmt.print(hour)))
+            .distinct
+    }
+
+
+    /**
+      * Represents a JSON dataset target for refinement.  This mainly exists 
to reduce the number
+      * of parameters we have to pass around between functions here.  An 
instantiated JsonTarget
+      * should contain enough information to use SparkJsonToHive to refine a 
single Json partition
+      * into a Hive Parquet table.
+      *
+      * @param fs                   Hadoop FileSystem
+      * @param inputPath            Full input partition path
+      * @param inputIsSequenceFile  If the input is a Hadoop Sequence File
+      * @param partition            HivePartition
+      * @param doneFlag             Name of file that should be written upon 
success of
+      *                             SparkJsonToHive run.  This can be created 
by calling
+      *                             the writeDoneFlag method.
+      */
+    case class JsonTarget(
+        fs: FileSystem,
+        inputPath: Path,
+        inputIsSequenceFile: Boolean,
+        partition: HivePartition,
+        doneFlag: String
+    ) {
+        /**
+          * Easy access to the fully qualified Hive table name.
+          */
+        val tableName: String = partition.tableName
+
+        /**
+          * Easy access to the hive partition path, AKA the output destination 
path
+          */
+        val outputPath = new Path(partition.path)
+
+        /**
+          * Path to doneFlag in hive table partition output path
+          */
+        val doneFlagPath = new Path(s"$outputPath/$doneFlag")
+
+        /**
+          * Number of records successfully refined for this JsonTarget.
+          */
+        var recordCount: Long = -1
+
+        /**
+          * The mtime of the inputPath at the time this JsonTarget is 
instantiated.
+          * caching this allows us to use the earliest mtime possible to store 
in doneFlag,
+          * in case the inputPath changes while this target is being refined.
+          */
+        private val inputMTimeCached: Option[DateTime] = inputMTime()
+
+        /**
+          * True if the inputPath exists
+          * @return
+          */
+        def inputExists(): Boolean = fs.exists(inputPath)
+
+        /**
+          * True if the outputPath exists
+          * @return
+          */
+        def outputExists(): Boolean = fs.exists(outputPath)
+
+        /**
+          * True if the outputPath/doneFlag exists
+          * @return
+          */
+        def doneFlagExists(): Boolean = fs.exists(doneFlagPath)
+
+        /**
+          * Returns the mtime Long timestamp of inputPath.  inputPath's
+          * mtime will change if it or any of its direct files change.
+          * It will not change if a content in a subdirectory changes.
+          * @return
+          */
+        def inputMTime(): Option[DateTime] = {
+            if (inputExists()) {
+                Some(new 
DateTime(fs.getFileStatus(inputPath).getModificationTime))
+            }
+            else
+                None
+        }
+
+        /**
+          * Reads the Long timestamp out of the outputPath/doneFlag if it 
exists.
+          * @return
+          */
+        def doneFlagMTime(): Option[DateTime] = {
+            if (doneFlagExists()) {
+                val inStream = fs.open(doneFlagPath)
+                val mtime = new DateTime(inStream.readUTF())
+                inStream.close()
+                Some(mtime)
+            }
+            else
+                None
+        }
+
+        /**
+         * Write out doneFlag file for this output target partition
+         *
+         * This saves the modification timestamp of the inputPath as it when 
this target was
+         * instantiated.  This will allow later comparison of the contents of 
doneFlag with the
+         * inputPath modification time.  If they are different, the user might 
decide to rerun
+         * SparkJsonToHive for this target, perhaps assuming that there is new
+         * data in inputPath.  Note that inputPath directory mod time only 
changes if
+         * its direct content changes, it will not change if something in a 
subdirectory
+         * below it changes.
+         */
+        def writeDoneFlag(): Unit = {
+
+            val mtime = inputMTimeCached.getOrElse(
+                throw new RuntimeException(
+                    s"Cannot write done flag, input mod time was not obtained 
when $this was " +
+                        s"instantiated, probably because it did not exist. 
This should not happen"
+                )
+            )
+
+            log.info(
+                s"Writing done flag file at $doneFlagPath with $inputPath last 
modification " +
+                s"timestamp of $mtime"
+            )
+
+            val outStream = fs.create(doneFlagPath)
+            outStream.writeUTF(mtime.toString)
+            outStream.close()
+        }
+
+        /**
+          * This target needs refined if:
+          * - The output doesn't exist OR
+          * - The output doneFlag doesn't exist OR
+          * - The input's mtime does not equal the timestamp in the output 
doneFlag file,
+          *   meaning that something has changed in the inputPath since the 
last time doneFlag
+          *   was written.
+          *
+          * @return
+          */
+        def shouldRefine(): Boolean = {
+            !outputExists() || !doneFlagExists() || inputMTimeCached != 
doneFlagMTime()
+        }
+
+        /**
+          * Returns a Failure with e wrapped in a new more descriptive 
Exception
+          * @param e Original exception that caused this failure
+          * @return
+          */
+        def failure(e: Exception): Try[JsonTarget] = {
+            Failure(JsonTargetException(
+                this, s"Failed refinement of JSON dataset $this. Original 
exception: $e", e
+            ))
+        }
+
+        /**
+          * Returns Success(this) of this JsonTarget
+          * @return
+          */
+        def success(recordCount: Long): Try[JsonTarget] = {
+            this.recordCount = recordCount
+            Success(this)
+        }
+
+        override def toString: String = {
+            s"$inputPath -> $partition"
+        }
+    }
+
+
+    /**
+      * Exception wrapper used to retrieve the JsonTarget instance from a 
Failure instance.
+      * @param target   JsonTarget
+      * @param message  exception message
+      * @param cause    Original Exception
+      */
+    case class JsonTargetException(
+        target: JsonTarget,
+        message: String = "",
+        cause: Throwable = None.orNull
+    ) extends Exception(message, cause) { }
+
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestJsonRefine.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestJsonRefine.scala
new file mode 100644
index 0000000..61bebc8
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestJsonRefine.scala
@@ -0,0 +1,22 @@
+package org.wikimedia.analytics.refinery.job
+
+import com.github.nscala_time.time.Imports.DateTime
+import com.github.nscala_time.time.Imports._
+import org.scalatest.{FlatSpec, Matchers}
+
+class TestJsonRefine extends FlatSpec with Matchers {
+
+    it should "hoursInBetween gives hours in between two DateTimes" in {
+        val d1 = DateTime.now - 3.hours
+        val d2 = DateTime.now
+
+        val hoursShouldBe = Seq(
+            (new DateTime(DateTime.now, DateTimeZone.UTC) - 
3.hours).hourOfDay.roundFloorCopy,
+            (new DateTime(DateTime.now, DateTimeZone.UTC) - 
2.hours).hourOfDay.roundFloorCopy,
+            (new DateTime(DateTime.now, DateTimeZone.UTC) - 
1.hours).hourOfDay.roundFloorCopy
+        )
+
+        val hours = JsonRefine.hoursInBetween(d1, d2)
+        hours should equal (hoursShouldBe)
+    }
+}
\ No newline at end of file

-- 
To view, visit https://gerrit.wikimedia.org/r/346291
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ieb2c3a99501623d71fa58ac7dfb6734cb809096f
Gerrit-PatchSet: 42
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to