Ottomata has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/346291 )
Change subject: JsonRefine: refine arbitrary JSON datasets into Parquet backed
hive tables
......................................................................
JsonRefine: refine arbitrary JSON datasets into Parquet backed hive tables
Given many config parameters, this looks for JSON datasets matching partition
patterns and date time formats, and determins which of the existent input
json partition directories need to be refined. Those that don't exist
in the configured output tables, and those that have had input
data modifications since a previous successful refinement are slated
for refinement.
This uses SparkJsonToHive to merge existent Hive table schemas with
those inferred by Spark from the JSON data.
If schemas can not be merged, a refinement will fail, but this will not
cause the entire JsonRefine job to fail. Reports about what has
succeeded and what has failed are output and optionally emailed.
Note:
Edit schema is a bust. The CREATE TABLE statement generated by this logic
works fine directly in Hive, but
via Spark it fails:
Failure(java.lang.Exception: Failed refinement of EventLogging Edit
(year=2017,month=05,day=04,hour=12) -> otto.Edit
(/user/otto/external/eventlogging2/Edit). Original exception:
org.apache.spark.sql.catalyst.util.DataTypeException: Unsupported dataType:
struct<action:string,action.abort.mechanism:string,action.abort.timing:bigint,action.abort.type:string,action.init.mechanism:string,action.init.timing:bigint,action.init.type:string,action.ready.timing:bigint,action.saveAttempt.timing:bigint,action.saveFailure.message:string,action.saveFailure.timing:bigint,action.saveFailure.type:string,action.saveIntent.timing:bigint,action.saveSuccess.timing:bigint,editingSessionId:string,editor:string,integration:string,mediawiki.version:string,page.id:bigint,page.ns:bigint,page.revid:bigint,page.title:string,platform:string,user.class:string,user.editCount:bigint,user.id:bigint,version:bigint>.
If you have a struct and a field name of it has any special characters, please
use backticks (`) to quote that field name, e.g. `x+y`. Please note that
backtick itself is not supported in a field name.)
This Error comes from Spark reading the schema out of hive directly, when
calling hiveContext.table(tableName),
where the table has crazy struct fields with dots in the name. Hive doesn't
seem to mind.
I don't think we can fix this, and as such, the EventLogging Analytics Edit
table will not be refineable by Spark.
mediawiki_page_properties_change also seems to not work, due to the variable
object property types.
Otherwise, this is working great for both EventLogging Analytics tables, and
for EventBus style data.
Bug: T161924
Change-Id: Ieb2c3a99501623d71fa58ac7dfb6734cb809096f
---
M refinery-core/pom.xml
M
refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
A
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/HivePartition.scala
A
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
A
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkSQLHiveExtensions.scala
A
refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestHivePartition.scala
A
refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestSparkSQLHiveExtensions.scala
M refinery-job/pom.xml
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
A
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestJsonRefine.scala
10 files changed, 2,448 insertions(+), 1 deletion(-)
Approvals:
Ottomata: Verified
Joal: Looks good to me, approved
diff --git a/refinery-core/pom.xml b/refinery-core/pom.xml
index ce6e27b..247582c 100644
--- a/refinery-core/pom.xml
+++ b/refinery-core/pom.xml
@@ -76,10 +76,50 @@
</dependency>
<dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.10</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.nscala-time</groupId>
+ <artifactId>nscala-time_2.10</artifactId>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>12.0</version>
+ </dependency>
+ <dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.7</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ <version>1.4.7</version>
+ </dependency>
+
+
</dependencies>
<build>
diff --git
a/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
b/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
index 9f99ea4..6dbdd2a 100644
---
a/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
+++
b/refinery-core/src/main/java/org/wikimedia/analytics/refinery/core/Utilities.java
@@ -16,8 +16,15 @@
package org.wikimedia.analytics.refinery.core;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeMessage;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.regex.Pattern;
/**
@@ -109,4 +116,36 @@
}
+
+ public static void sendEmail(
+ String smtpHost,
+ String smtpPort,
+ String fromEmail,
+ String[] toEmails,
+ String subject,
+ String body
+ ) {
+ Properties props = new Properties();
+ props.put("mail.smtp.host", smtpHost);
+ props.put("mail.smtp.auth", "false");
+ props.put("mail.smtp.port", smtpPort);
+
+ Session session = Session.getDefaultInstance(props);
+
+ try {
+ MimeMessage message = new MimeMessage(session);
+ message.setFrom(new InternetAddress(fromEmail));
+ for (String email : toEmails) {
+ message.addRecipient(Message.RecipientType.TO, new
InternetAddress(email));
+ }
+ message.setSubject(subject);
+ message.setText(body);
+
+ Transport.send(message);
+ }
+ catch (MessagingException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
}
\ No newline at end of file
diff --git
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/HivePartition.scala
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/HivePartition.scala
new file mode 100644
index 0000000..01a83bb
--- /dev/null
+++
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/HivePartition.scala
@@ -0,0 +1,157 @@
+package org.wikimedia.analytics.refinery.core
+
+import scala.collection.immutable.ListMap
+import scala.util.matching.Regex
+
+
+/**
+ * Represents a full Hive table partition.
+ * Note: Keeping the order of partition parameters is important to be able to
insert
+ * data in the right sequence. Hence the ListMap type of the 'partitions'
parameter.
+ *
+ * @param database Hive database
+ * @param t Hive table (this will be normalized as .table)
+ * @param location Hive table LOCATION path
+ * @param partitions ListMap of partition keys -> partition values.
+ */
+case class HivePartition(
+ database: String,
+ private val t: String,
+ location: String,
+ partitions: ListMap[String, String] = ListMap()
+) {
+ val table: String = HivePartition.normalize(t)
+ val tableName: String = s"$database.$table"
+
+ /**
+ * Seq of partition keys
+ */
+ val keys: Seq[String] = partitions.keys.toSeq
+
+ /**
+ * A string suitable for use in Hive QL partition operations,
+ * e.g. year=2017,month=07,day=12,hour=0
+ */
+ val hiveQL: String = {
+ partitions.map { case (k: String, v: String) => {
+ v match {
+ // If the value looks like a number, strip leading 0s
+ case n if n.forall(_.isDigit) => (k,
n.replaceFirst("^0+(?!$)", ""))
+ // Else the value should be a string, then quote it.
+ case s => (k, s""""$s"""")
+ }
+ }}
+ .map(p => s"${p._1}=${p._2}").mkString(",")
+ }
+
+ /**
+ * A relative (LOCATION-less) Hive partition path string.
+ * This is how Hive creates partition directories.
+ */
+ val relativePath: String = {
+ partitions.map { case (k: String, v: String) => {
+ v match {
+ // If the value looks like a number, strip leading 0s
+ case n if n.forall(_.isDigit) => (k,
n.replaceFirst("^0+(?!$)", ""))
+ // Else the value should be a string, no need to quote in path.
+ case s => (k, s)
+ }
+ }}
+ .map(p => s"${p._1}=${p._2}").mkString("/")
+ }
+
+ /**
+ * Absolute path to this Hive table partition.
+ */
+ val path: String = location + "/" + relativePath
+
+ /**
+ * Get a partition value by key
+ * @param key partition key
+ * @return
+ */
+ def get(key: String): Option[String] = {
+ partitions.get(key)
+ }
+
+ /**
+ * True if there are actual partition values defined.
+ * @return
+ */
+ def nonEmpty: Boolean = {
+ partitions.nonEmpty
+ }
+
+ override def toString: String = {
+ s"$table ($hiveQL)"
+ }
+}
+
+
+/**
+ * Companion object with helper constructor via apply.
+ * This allows construction of a HivePartition via a String,
+ * rather than a ListMap directly.
+ */
+object HivePartition {
+ /**
+ *
+ * @param s
+ * @return
+ */
+ def normalize(s: String): String = {
+ s.replace("-", "_")
+ }
+
+ /**
+ * This helper constructor will use an extractor regex to get the table
and partitions
+ * out of a path to extract from, and return a new HivePartition.
+ *
+ * @param database Hive database
+ * @param baseLocation Base location to where this table will be
stored. Final table
+ * location will be baseLocation/table, where
table is the extracted
+ * and normalized table name.
+ * @param pathToExtract path string from which extractorRegex will get
table and partitions
+ * @param extractorRegex regex that captures table and partitions from
pathToExtract
+ * @return
+ */
+ def apply(
+ database: String,
+ baseLocation: String,
+ pathToExtract: String,
+ extractorRegex: Regex
+ ): HivePartition = {
+ // Get a ListMap of all named groups captured from inputPathRegex
+ val capturedKeys = captureListMap(pathToExtract, extractorRegex)
+ // "table" MUST be an extracted key.
+ val table = normalize(capturedKeys("table"))
+ // The hive table location is baseLocation/table
+ val location = baseLocation + "/" + table
+ // The partitions are any other key=value pairs groups captured by the
regex.
+ val partitions = capturedKeys - "table"
+
+ new HivePartition(database, table, location, partitions)
+ }
+
+ /**
+ * This will extract the matched regex groupNames and their captured
values
+ * into a ListMap. ListMap is used so that order is preserved.
+ *
+ * @param s String to match
+ * @param regex Regex with named capture groups
+ * @throws RuntimeException if regex does not match s
+ * @return
+ */
+ private def captureListMap(s: String, regex: Regex): ListMap[String,
String] = {
+ val m = regex.findFirstMatchIn(s).getOrElse(
+ throw new RuntimeException(
+ s"Regex $regex did not match $s when attempting to extract
capture group keys"
+ )
+ )
+
+ m.groupNames.foldLeft(ListMap[String, String]()) {
+ case (currMap, (name: String)) => currMap ++ ListMap(name ->
m.group(name))
+ }
+ }
+
+}
diff --git
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
new file mode 100644
index 0000000..8369104
--- /dev/null
+++
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
@@ -0,0 +1,413 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.apache.log4j.LogManager
+
+import scala.util.control.Exception.{allCatch, ignoring}
+
+import org.apache.hadoop.fs.Path
+
+
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.functions.lit
+
+// Import implicit StructType and StructField conversions.
+// This allows us use these types with an extendend API
+// that includes schema merging and Hive DDL statement generation.
+import SparkSQLHiveExtensions._
+
+
+/**
+ * Converts arbitrary JSON to Hive Parquet by 'evolving' the Hive table to
+ * add new fields as they are encountered in the JSON data.
+ *
+ * Usage:
+ *
+ * SparkJsonToHive(
+ * hiveContext,
+ * "/path/to/input/data/MyCoolEvent/2017/07/01/00",
+ * HivePartition(
+ * "mydatabase", "mytable", "/path/to/external/mytable",
+ * ListMap("year" -> 2017, "month" -> "07", "day" -> "30", "hour"
-> "00)
+ * )
+ * )
+ *
+ * If mydb.MyCoolEventTable does not exist in Hive, it will be created based
on the JSON
+ * records and Spark schemas infered by merging all fields from all records
found in
+ * the inputPath.
+ *
+ * Later, after more JSON data has been imported, you can run:
+ *
+ * SparkJsonToHive(
+ * hiveContext,
+ * "/path/to/input/data/MyCoolEvent/2017/07/01/01",
+ * HivePartition(
+ * "mydatabase", "mytable", "/path/to/external/mytable",
+ * ListMap("year" -> 2017, "month" -> "07", "day" -> "30", "hour"
-> "01")
+ * )
+ * )
+ *
+ * If any new fields are encountered in this new input data, the now existent
+ * mydatabase.mytable table in Hive will be altered to include these fields.
+ *
+ * If any type changes are encountered between the existent Hive table and the
+ * new input data, an IllegalStateException Exception will be thrown.
+ *
+ */
+object SparkJsonToHive {
+ private val log = LogManager.getLogger("SparkJsonToHive")
+
+
+ /**
+ * Reads inputPath as JSON data, creates or alters tableName in Hive to
match the inferred
+ * schema of the input JSON data, and then inserts the data into the
table.
+ *
+ * @param hiveContext Spark HiveContext
+ *
+ * @param inputPath Path to JSON data
+ *
+ *
+ * @param partition HivePartition. This helper class contains
+ * database and table name, as well as external
location
+ * and partition keys and values.
+ *
+ * @param isSequenceFile If true, inputPath is expected to contain JSON
in
+ * Hadoop Sequence Files, else JSON in text files.
+ *
+ * @param doneCallback Function to call after a successful run
+ *
+ * @return The number of records refined
+ */
+ def apply(
+ hiveContext: HiveContext,
+ inputPath: String,
+ partition: HivePartition,
+ isSequenceFile: Boolean,
+ doneCallback: () => Unit
+ ): Long = {
+ // Set this so we can partition by fields in the DataFrame.
+ hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
+
+ // Read the JSON data into a DataFrame and
+ // add constant value partition columns.
+ val inputDf = dataFrameWithHivePartitions(
+ readJsonDataFrame(hiveContext.asInstanceOf[SQLContext], inputPath,
isSequenceFile),
+ partition
+ )
+
+ // Grab the partition name keys to use for Hive partitioning.
+ val partitionNames = partition.keys
+
+ try {
+ // This will create the Hive table based on inputDf.schema if it
doesn't yet exist,
+ // or it will alter an existent table adding fields so that it
matches inputDf.schema.
+ // When comparing and running DDL statements, all fields will be
lowercased and
+ // made nullable.
+ prepareHiveTable(
+ hiveContext,
+ inputDf.schema,
+ partition.tableName,
+ partition.location,
+ partitionNames
+ )
+ } catch {
+ case e: IllegalStateException => {
+ log.fatal(
+ s"""Failed preparing Hive table ${partition.tableName} with
+ |input
schema:\n${inputDf.schema.treeString}""".stripMargin, e)
+ throw e
+ }
+ }
+
+ // We need to re-read the JSON data with table schema merged with a
schema
+ // for all fields in the JSON data. This will allow us to be sure that
+ // A. fields we read in from JSON are in the same order as the Hive
table,
+ // and B. that fields not present in the JSON are set to null.
+ //
+ // NOTE: It should be possible to use inputDf with fields reordered to
match
+ // the Hive table schema field ordering exactly, rather than having to
+ // re-read the JSON data a second time with the merged schema. I
haven't
+ // been able to successfully do this, mainly because I didn't figure
out how to use
+ // the Spark DataFrame / StructType APIs to fully reorder and add
fields. We'd have
+ // to recursively reorder and add missing fields to inputDf.
+ // IF we can do this, then it should be possible to abstract away the
'JSON-ness' of
+ // This code, and have it work with any DataFrame, since we could
pre-load
+ // the DataFrame before passing it to this object. We wouldn't need
the inputPath
+ // or original data format (JSON) in order to infer the schema.
+
+
+ val table = hiveContext.table(partition.tableName)
+
+ // normalize=false means that top level fields will not be lowercased,
but
+ // they will always be made nullable. We need this because we are
about to use
+ // this schema to re-read the json data, and here, case matters. Any
fields that
+ // are in inputDf must have the same case in the schema we use to read
the JSON.
+ val nonNormalizedSchema = table.schema.merge(inputDf.schema,
normalize=false)
+
+ val mergedSchemaDf = dataFrameWithHivePartitions(
+ // re-read the JSON data with the merged non normalized schema.
Any fields in this
+ // schema that are not in the data (i.e. they are in the table,
but not in this JSON),
+ // will automatically be set to null by Spark.
+ readJsonDataFrame(
+ hiveContext.asInstanceOf[SQLContext],
+ inputPath,
+ isSequenceFile,
+ Option(nonNormalizedSchema)
+ ),
+ partition
+ )
+ // mergedSchemaDf will still have non-normalized (e.g. with capital
letter) field names,
+ // but, that's ok. We're only inserting into Hive now, and Spark
HiveContext will do the
+ // lower casing for us.
+
+ log.info(
+ s"""Inserting into `${partition.tableName}` DataFrame with
schema:\n
+ |${mergedSchemaDf.schema.treeString} for partition
$partition""".stripMargin
+ )
+ // Insert data into Hive table.
+ // TODO parameterize "overwrite" to allow "append"?
+ mergedSchemaDf.write.mode("overwrite")
+ .partitionBy(partitionNames:_*)
+ .insertInto(partition.tableName)
+
+
+ // Return the String path of the external partition we just inserted
into.
+ val partitionPath = hivePartitionPath(hiveContext, partition)
+ log.info(
+ s"Finished inserting into `${partition.tableName}` DataFrame, " +
+ s"wrote to external location $partitionPath."
+ )
+
+ // call doneCallback
+ doneCallback()
+
+ mergedSchemaDf.count()
+ }
+
+ /**
+ * Reads a JSON data set out of path. If isSequenceFile is true, the
data should
+ * be in Hadoop Sequence file format, instead of text.
+ *
+ * @param sqlContext Spark SQLContext
+ *
+ * @param path Path to JSON data
+ *
+ * @param isSequenceFile If true (default), path is expected to contain
Hadoop
+ * Sequence Files with JSON record strings as
values, else
+ * just JSON text files.
+ *
+ * @param schema Optional schema to use for reading data. If
not given,
+ * Spark's JSON reading logic will infer the
schema by passing over
+ * all data and examining fields in each record.
+ *
+ * @return
+ */
+ def readJsonDataFrame(
+ sqlContext: SQLContext,
+ path: String,
+ isSequenceFile: Boolean = true,
+ schema: Option[StructType] = None
+ ): DataFrame = {
+ // If we have a schema, then use it to read JSON data,
+ // else just infer schemas while reading.
+ val dfReader = if (schema.isDefined) {
+ sqlContext.read.schema(schema.get)
+ }
+ else {
+ sqlContext.read
+ }
+
+ if (isSequenceFile) {
+ // Load DataFrame from JSON data in path
+ dfReader.json(
+ // Expect data to be SequenceFiles with JSON strings as values.
+ sqlContext.sparkContext.sequenceFile[Long, String](path).map(t
=> t._2)
+ )
+ }
+ else {
+ dfReader.json(path)
+ }
+ }
+
+
+ /**
+ * If tableName does not exist in Hive, this will create it with the
schema newSchema.
+ * Else, it will attempt to alter the table to add any fields in
newSchema that are
+ * not already in the Hive table. If any fields change types, this will
throw an
+ * exception.
+ *
+ * @param hiveContext Spark HiveContext
+ *
+ * @param newSchema Spark schema representing the schema of the
+ * table to be created or altered.
+ *
+ * @param tableName Fully qualified (dotted) Hive table name.
+ *
+ * @param locationPath Path to external table data.
+ *
+ * @param partitionNames List of partition names. These must be
present as
+ * fields in the newSchema and/or Hive table.
+ *
+ * @return
+ */
+ def prepareHiveTable(
+ hiveContext: HiveContext,
+ newSchema: StructType,
+ tableName: String,
+ locationPath: String = "",
+ partitionNames: Seq[String] = Seq.empty
+ ): Boolean = {
+ val ddlStatements = getDDLStatements(
+ hiveContext, newSchema, tableName, locationPath, partitionNames
+ )
+
+ // CREATE or ALTER the Hive table if we have a change to make.
+ if (ddlStatements.nonEmpty) {
+ ddlStatements.foreach { (s) =>
+ log.info(s"Running Hive DDL statement:\n$s")
+ ignoring(classOf[AlreadyExistsException]) {
+ hiveContext.sql(s)
+ }
+ }
+ // Refresh Spark's metadata about this Hive table.
+ hiveContext.refreshTable(tableName)
+ true
+ }
+ else
+ false
+ }
+
+
+ /**
+ * If tableName does not exist in Hive, this will return a single element
Seq
+ * with a Hive CREATE TABLE statement to create a table that represents
newSchema.
+ * Else, if tableName does exist, it will return a Seq of Hive ALTER
TABLE statements
+ * required to update the table schema to match any new fields found in
newSchema.
+ *
+ * @param hiveContext Spark HiveContext
+ *
+ * @param newSchema Spark schema representing the schema of the
+ * table to be created or altered.
+ *
+ * @param tableName Fully qualified (dotted) Hive table name.
+ *
+ * @param locationPath Path to external table data.
+ *
+ * @param partitionNames List of partition names. These must be
present as
+ * fields in the newSchema and/or Hive table.
+ *
+ * @return
+ */
+ def getDDLStatements(
+ hiveContext: HiveContext,
+ newSchema: StructType,
+ tableName: String,
+ locationPath: String = "",
+ partitionNames: Seq[String] = Seq.empty
+ ): Iterable[String] = {
+ // If the Hive table doesn't exist, get CREATE DDL to create it like
newSchema.
+ if (!hiveTableExists(hiveContext, tableName)) {
+ Seq(newSchema.hiveCreateDDL(tableName, locationPath,
partitionNames))
+ }
+ // Else get ALTER DDL statements to alter the existing table to add
fields from new Schema.
+ else {
+ val tableSchema = hiveContext.table(tableName).schema
+ val alterDDLs = tableSchema.hiveAlterDDL(tableName, newSchema)
+ if (alterDDLs.nonEmpty) {
+ log.info(s"""Found difference in schemas for Hive table
`$tableName`
+ |Table schema:\n${tableSchema.treeString}
+ |Input
schema:\n${newSchema.treeString}""".stripMargin
+ )
+ }
+
+ alterDDLs
+ }
+ }
+
+
+ /**
+ * Returns true if the fully qualified tableName exists in Hive, else
false.
+ *
+ * @param hiveContext Spark HiveContext
+ *
+ * @param tableName Fully qualified Hive table name
+ *
+ * @return
+ */
+ def hiveTableExists(hiveContext: HiveContext, tableName: String): Boolean
= {
+ allCatch.opt(hiveContext.table(tableName)) match {
+ case Some(_) => true
+ case _ => false
+ }
+ }
+
+
+ /**
+ * Returns a new DataFrame with constant Hive partitions added as
columns. If any
+ * column values are convertable to Ints, they will be added as an Int,
otherwise String.
+ *
+ * @param df Input DataFrame
+ *
+ * @param partition HivePartition
+ *
+ * @return
+ */
+ def dataFrameWithHivePartitions(
+ df: DataFrame,
+ partition: HivePartition
+ ): DataFrame = {
+ // Add partitions to DataFrame.
+ partition.keys.foldLeft(df) {
+ case (currentDf, (key: String)) =>
+ val value = partition.get(key).get
+ log.info(s"Adding partition $key=$value")
+ // If the partition value looks like an Int, convert it,
+ // else just use as a String. lit() will convert the Scala
+ // value (Int or String here) into a Spark Column type.
+ currentDf.withColumn(key,
lit(allCatch.opt(value.toInt).getOrElse(value)))
+ }
+ }
+
+
+ /**
+ * Runs a DESCRIBE FORMATTED query and extracts path to a
+ * Hive table partition.
+ *
+ * @param hiveContext HiveContext
+ * @param partition HivePartition
+ * @return
+ */
+ def hivePartitionPath(
+ hiveContext: HiveContext,
+ partition: HivePartition
+ ): String = {
+ var q = s"DESCRIBE FORMATTED `${partition.tableName}`"
+ if (partition.nonEmpty) {
+ q += s" PARTITION (${partition.hiveQL})"
+ }
+
+ // This query will return a human readable block of text about
+ // this table (partition). We need to parse out the Location
information.
+ val locationLine: Option[String] = hiveContext.sql(q)
+ .collect()
+ // Each line is a Row[String], map it to Seq of Strings
+ .map(_.toString())
+ // Find the line with "Location"
+ .find(_.contains("Location"))
+
+ // If we found Location in the string, then extract just the location
path.
+ if (locationLine.isDefined) {
+ locationLine.get.filterNot("[]".toSet).trim.split("\\s+").last
+ }
+ // Else throw an Exception. This shouldn't happen, as the Hive query
+ // will fail earlier if if the partition doesn't exist.
+ else {
+ throw new RuntimeException(s"Failed finding path of Hive table
partition $partition")
+ }
+ }
+
+}
diff --git
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkSQLHiveExtensions.scala
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkSQLHiveExtensions.scala
new file mode 100644
index 0000000..1ef1ece
--- /dev/null
+++
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkSQLHiveExtensions.scala
@@ -0,0 +1,438 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.apache.spark.sql.types._
+
+
+/**
+ * Implicit method extensions to Spark's StructType and StructField.
+ * This is useful for converting and evolving Hive tables to match
+ * Spark DataTypes.
+ *
+ * Usage:
+ *
+ * import org.wikimedia.analytics.refinery.core.SparkSQLHiveExtensions._
+ *
+ * val df: DataFrame = // read original source data
+ * val hiveCreateStatement = df.schema.hiveCreateDDL("mydb.mytable")
+ * hiveContext.sql(hiveCreateStatement)
+ * //...
+ *
+ * val table = hiveContext.table("mydb.table")
+ * val df: DataFrame = // read new data, maybe has new fields
+ * val hiveAlterStatement = table.schema.hiveAlterDDL("mydb.mytable",
df.schema)
+ * hiveContext.sql(hiveAlterStatement)
+ */
+object SparkSQLHiveExtensions {
+
+ /**
+ * Implicit methods extensions for Spark StructField.
+ *
+ * @param field
+ */
+ implicit class StructFieldExtensions(field: StructField) {
+
+ /**
+ * Returns a copy of this StructField with a name toLowerCase.
+ * @return
+ */
+ def toLowerCase: StructField = {
+ Option(field.name).map(n =>
field.copy(name=n.toLowerCase)).getOrElse(field)
+ }
+
+ /**
+ * Returns a nullable or non nullable copy of this StructField.
+ * @param nullable
+ * @return
+ */
+ def makeNullable(nullable: Boolean = true): StructField =
+ field.copy(nullable=nullable)
+
+ /**
+ * Normalizes (toLowerCase and makeNullable) a copy of this
StructField.
+ * Ints are converted to Longs, Floats are converted to Doubles.
+ * Longs and Doubles will handle more cases where field values
+ * look like an int or float during one iteration, and a long or
double later.
+ *
+ * Hyphens will be converted to underscores.
+ * @param lowerCase if true, the field name will be lower cases.
Default: true
+ * @return
+ */
+ def normalize(lowerCase: Boolean = true): StructField = {
+ val f = {field.dataType match {
+ case IntegerType => field.copy(dataType=LongType)
+ case FloatType => field.copy(dataType=DoubleType)
+ case _ => field
+ }}.copy(name=field.name.replace('-',
'_')).makeNullable(nullable=true)
+ if (lowerCase) f.toLowerCase else f
+ }
+
+
+ /**
+ * Builds a Hive DDL string representing the Spark field, useful in
+ * CREATE and ALTER DDL statements.
+ *
+ * @return
+ */
+ def hiveColumnDDL: String = {
+ s"`${field.name}` ${field.typeString}" +
+ s"${if (field.nullable) "" else " NOT NULL"}"
+ }
+
+
+ /**
+ * Spark's DataType.simpleString mostly works for Hive field types.
But, since
+ * we need struct field names to be backtick quoted, we need to call
+ * a special method for in the case where this field is a StructType.
+ *
+ * @return
+ */
+ def typeString: String = {
+ if (field.isStructType) {
+ field.dataType.asInstanceOf[StructType].quotedSimpleString
+ }
+ else {
+ field.dataType.simpleString
+ }
+ }
+
+
+ /**
+ * Returns true if this field.dataType is a StructType, else False.
+ * @return
+ */
+ def isStructType: Boolean = {
+ field.dataType match {
+ case(StructType(_)) => true
+ case _ => false
+ }
+ }
+
+ def isLongType: Boolean = {
+ field.dataType match {
+ case(LongType) => true
+ case _ => false
+ }
+ }
+
+ }
+
+
+ /**
+ * Implicit method extensions for Spark StructType.
+ *
+ * @param struct
+ */
+ implicit class StructTypeExtensions(struct: StructType) {
+
+ /**
+ * Returns a copy of this struct with all fields 'normalized'.
+ * If lowerCase is true, then the field name will be lower cased.
+ * This function recurses on sub structs, and normalizes them
+ * with lowerCase = false, keeping the cases on sub struct field
names.
+ *
+ * All ints will be converted to longs, and all floats will be
+ * converted to doubles. A field value that may
+ * at one time look like an int, may during a later iteration
+ * look like a long. We choose to always use the larger data type.
+ *
+ * @param lowerCase Default: false
+ * @return
+ */
+ def normalize(lowerCase: Boolean = true): StructType = {
+ StructType(struct.foldLeft(Seq.empty[StructField])(
+ (fields: Seq[StructField], field: StructField) => {
+ // toLowerCase and makeNullable this field.
+ val fieldNormalized = field.normalize(lowerCase=lowerCase)
+
+ if (field.isStructType) {
+ fields :+ fieldNormalized.copy(
+
dataType=fieldNormalized.dataType.asInstanceOf[StructType]
+ .normalize(lowerCase=false)
+ )
+ }
+ else {
+ fields :+ fieldNormalized
+ }
+ }
+ ))
+ }
+
+
+ // NOTE: Fully recursive normalize and denormalize was implemented at
+ //
https://gist.github.com/jobar/91c552321efbedba03c8215284726f88#gistcomment-2077149,
+ // but we have decided not to include this functionality. Sub
StructType field names
+ // are not easily denormalizeable. It is legal to have field names at
different
+ // struct levels that all normalize to the same name. Reverting them
back to
+ // their original names would require building and keeping a recursive
map that matched
+ // the original struct hierarchy exactly.
+ // These methods were created in order to build Spark schemas that can
be used
+ // to maintain and update a Hive table, and Hive seems to be
indifferent to
+ // cases used in its struct<> column type.
+
+
+ /**
+ * Returns a new StructType with otherStruct merged into this. Any
identical duplicate
+ * fields shared by both will be reduced to one field. Non
StructType Fields with the
+ * same name but different types will result in an
IllegalStateException. StructType
+ * fields with the same name will be recursively merged. All fields
will
+ * be made nullable. Comparison of top level field names is done
case insensitively,
+ * i.e. myField is equivalent to myfield.
+ *
+ * @param otherStruct Spark StructType schema
+ *
+ * @param normalize If False, the returned schema will contain the
original
+ * (non lowercased) field names. Comparison of
fields will
+ * still be done case insensitive.
+ *
+ * @return
+ */
+ def merge(otherStruct: StructType, normalize: Boolean = true):
StructType = {
+ val combined = StructType(struct ++ otherStruct)
+ val combinedNormalized = combined.normalize()
+
+ // Distinct using case insensitive and types.
+ // Result will be sorted by n1 fields first, with n2 fields at the
end.
+ // distinctFields could still have repeat field names with
different types.
+ val distinctFields: Seq[StructField] = combinedNormalized.distinct
+
+ val distinctNames: Seq[String] =
combinedNormalized.fieldNames.distinct
+
+ // Store a map of fields by name
+ val fieldsByName: Map[String, Seq[StructField]] =
distinctFields.groupBy(_.name)
+
+ // Find fields with repeat field names, e.g.
+ // Where there's more than one field per name.
+ val repeatFieldsByName: Map[String, Seq[StructField]] =
+ fieldsByName.filter(_._2.size > 1)
+
+ // If any of the repeated fields is a non StructType, then throw
Exception now.
+ // We can't deal with that crap!
+ repeatFieldsByName.foreach {
+ // If there are any fields for this field name that aren't
StructTypes
+ case(name, fields) if fields.exists(!_.isStructType) => {
+ throw new IllegalStateException(
+ s"""merge failed - Field $name is repeated with
multiple non
+ |StructType types: ${fields.mkString(" ,
")}""".stripMargin
+ )
+ }
+ case _ => ()
+ }
+
+ // Ok! If we get this far, we don't have any non StructType type
changes.
+ // repeatFieldsByName will only contain StructType fields.
+ // Build the mergedStruct from all non repeated fields + merged
repeated struct fields.
+ val mergedStruct = StructType(
+ distinctNames.map { name =>
+ // If this field name only occurred once, then we can just
+ // grab the single field out of fieldsByName and keep it.
+ if (!repeatFieldsByName.contains(name)) {
+ fieldsByName(name).head
+ }
+ // Else, the field name is repeated, and we already know
that the
+ // repeated types are all StructTypes. We can deal with
StructType
+ // type changes by merging the structs.
+ else {
+ // Get the Seq of StructFields that are the different
StructTypes
+ // for this field name.
+ val mergedStruct = repeatFieldsByName(name)
+ // Map each StructField to its StructType DataType
+ .map(_.dataType.asInstanceOf[StructType])
+ // Recursively merge each StructType together
+ .foldLeft(StructType(Seq.empty))((merged, current)
=>
+ // Don't normalize sub struct schemas. Spark
doesn't
+ // lowercase Hive struct<> field names, and
those
+ // seem to be nullable by default anyway.
+ // If we did normalize, then we'd have to
recursively
+ // un-normalize if the original caller passed
normalize=false.
+ merged.merge(current, normalize=false)
+ )
+
+ // Convert the StructType back into a StructField with
this field name.
+ StructField(name, mergedStruct, nullable=true)
+ }
+ }
+ )
+
+ // If we want the normalized (lower cased) field names, return
mergedStruct now.
+ if (normalize) {
+ mergedStruct
+ }
+ // Else we want the mergedStruct returned with the original
+ // top level non normalized field names.
+ else {
+ // Build a map from top level lowercased names to non normal
names.
+ val lookup: Map[String, String] = combined.fieldNames
+ .distinct.map(n => n.toLowerCase -> n).toMap
+ // Map the mergedStruct, renaming any fields in lookup.
+ StructType(mergedStruct.map(f =>
f.copy(name=lookup.getOrElse(f.name, f.name))))
+ }
+ }
+
+
+ /**
+ * Like simpleString, but backtick quotes field names inside of the
struct<>.
+ * e.g. struct<`fieldName`:string,`database`:string>
+ * This works better in Hive, as reserved keywords need to be quoted.
+ * If this struct contains sub structs, those will be recursively
quoted too.
+ *
+ * @return
+ */
+ def quotedSimpleString: String = {
+ val fieldTypes = struct.fields.map { field =>
+ if (field.isStructType) {
+
s"`${field.name}`:${field.dataType.asInstanceOf[StructType].quotedSimpleString}"
+ }
+ else {
+ s"`${field.name}`:${field.dataType.simpleString}"
+ }
+ }
+ s"struct<${fieldTypes.mkString(",")}>"
+ }
+
+ /**
+ * Returns String representing Hive column DDL, for use in Hive DDL
statements.
+ * This only returns the portion of the DDL statement representing
each column.
+ * E.g.
+ * `fieldname1` string,
+ * `fieldname2` bigint
+ * ..
+ *
+ * @param sep column DDL separator, default ",\n"
+ *
+ * @return
+ */
+ def hiveColumnsDDL(sep: String = ",\n"): String =
+ struct.map(_.hiveColumnDDL).mkString(sep)
+
+
+ /**
+ * Builds a Hive CREATE statement DDL string from this StructType
schema.
+ * Since Hive is case insensitive, the top level field names will
lowercased.
+ * To ease integration with missing fields in data, all fields are
made nullable.
+ *
+ * @return CREATE statement DDL string
+ */
+ def hiveCreateDDL(
+ tableName: String,
+ locationPath: String = "",
+ partitionNames: Seq[String] = Seq.empty
+ ): String = {
+ val schemaNormalized = struct.normalize()
+ val partitionNamesNormalized = partitionNames.map(_.toLowerCase)
+
+ // Validate that all partitions are in the schema.
+ if
(partitionNamesNormalized.diff(schemaNormalized.fieldNames).nonEmpty) {
+ throw new IllegalStateException(
+ s"""At least one partition field is not the Spark
StructType schema.
+ |partitions:
[${partitionNamesNormalized.mkString(",")}]""".stripMargin
+ )
+ }
+
+ val externalClause = if (locationPath.nonEmpty) " EXTERNAL" else ""
+
+ val columnsClause = StructType(schemaNormalized
+ .filterNot(f => partitionNamesNormalized.contains(f.name))
+ ).hiveColumnsDDL()
+
+
+ val partitionClause = {
+ if (partitionNamesNormalized.isEmpty) "-- No partition
provided"
+ else {
+ s"""PARTITIONED BY (
+ |${StructType(partitionNamesNormalized.map(
+ p =>
schemaNormalized(schemaNormalized.fieldIndex(p))
+ )).hiveColumnsDDL()}
+ |)""".stripMargin
+ }
+ }
+
+ val locationClause = if (locationPath.nonEmpty) s"\nLOCATION
'$locationPath'" else ""
+
+ s"""CREATE$externalClause TABLE `$tableName` (
+ |$columnsClause
+ |)
+ |$partitionClause
+ |STORED AS PARQUET$locationClause""".stripMargin
+ }
+
+
+ /**
+ * Merges otherSchema into this struct StructType, and builds Hive
+ * ALTER DDL statements to add any new fields to or change struct
definitions
+ * of an existing Hive table. Each DDL statement returned should be
executed in order
+ * to alter the target Hive table to match the merged schemas.
+ *
+ * Type changes for non-struct fields are not supported and will
result in an
+ * IllegalStateException.
+ *
+ * Field names will be lower cased, and all fields are made nullable.
+ *
+ * @param tableName Hive table name
+ * @param otherSchema Spark schema
+ *
+ * @return Iterable of ALTER statement DDL strings
+ */
+ def hiveAlterDDL(
+ tableName: String,
+ otherSchema: StructType
+ ): Iterable[String] = {
+ val schemaNormalized = struct.normalize()
+ val otherSchemaNormalized = otherSchema.normalize()
+
+ // Merge the base schema with otherSchema to ensure there are no
non struct type changes.
+ // (merge() will throw an exception if it encounters any)
+ val mergedSchemaNormalized =
schemaNormalized.merge(otherSchemaNormalized)
+
+ // diffSchema contains fields that differ in name or type from the
original schema.
+ val diffSchema = mergedSchemaNormalized.diff(schemaNormalized)
+
+
+ // If there are no new fields at all, then return empty Seq now.
+ if (diffSchema.isEmpty) {
+ Seq.empty[String]
+ }
+ else {
+ // Group the schema changes into ones for which we can
+ // just ADD COLUMNS in a single statement, and those
+ // for which we need individual CHANGE COLUMN statements.
+ // We must CHANGE COLUMN any StructType field that has changed.
+ val tableModifications = diffSchema.groupBy(f =>
+ // If this field is in diffSchema and in the original
schema,
+ // we know that it must be a StructType. merge() wouldn't
let
+ // us have have fields with the same name and different
types unless
+ // it is a StructType.
+ if (schemaNormalized.fieldNames.contains(f.name)) "change"
+ else "add"
+ )
+ // To be 100% sure we keep ordering, sort the grouped fields
by name.
+ .map { case (group, fields) => (group, fields.sortBy(f =>
f.name)) }
+
+ // Generate the ADD COLUMNS statement to add all new COLUMNS
+ val addStatements: Option[String] = if
(tableModifications.contains("add")) {
+ Option(s"""ALTER TABLE `$tableName`
+ |ADD COLUMNS (
+
|${StructType(tableModifications("add")).hiveColumnsDDL()}
+ |)""".stripMargin
+ )
+ }
+ else
+ None
+
+ // Generate each CHANGE COLUMNS statement needed to
+ // update the struct<> definition of a struct COLUMN.
+ val changeStatements: Seq[String] = tableModifications
+ .getOrElse("change", Seq.empty[StructField])
+ .map { f =>
+ s"""ALTER TABLE `$tableName`
+ |CHANGE COLUMN `${f.name}`
${f.hiveColumnDDL}""".stripMargin
+ }
+
+ // Return a Seq of all statements to run to update the Hive
table
+ // to match mergedSchemaNormalized.
+ addStatements ++ changeStatements
+ }
+ }
+ }
+}
+
+
diff --git
a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestHivePartition.scala
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestHivePartition.scala
new file mode 100644
index 0000000..5e52069
--- /dev/null
+++
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestHivePartition.scala
@@ -0,0 +1,63 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.collection.immutable.ListMap
+import scala.util.matching.Regex
+
+
+class TestHivePartition extends FlatSpec with Matchers {
+
+ val database = "testdb"
+ val table = "Test-Table"
+ val location = "/path/to/test_table"
+ val partitions: ListMap[String, String] = ListMap(
+ "datacenter" -> "dc1", "year" -> "2017", "month" -> "07"
+ )
+ val partition = HivePartition(
+ database, table, location, partitions
+ )
+
+ it should "normalized fully qualified table name" in {
+ partition.tableName should equal(s"$database.${table.replace("-",
"_")}")
+ }
+
+ it should "have partition keys" in {
+ partition.keys should equal(partition.keys.toSeq)
+ }
+
+ it should "have correct hive QL representation" in {
+ partition.hiveQL should equal("""datacenter="dc1",year=2017,month=7""")
+ }
+
+
+ it should "have correct hive relative path representation" in {
+ partition.relativePath should
equal("""datacenter=dc1/year=2017/month=7""")
+ }
+
+ it should "have correct hive full path" in {
+ partition.path should
equal(s"""$location/datacenter=dc1/year=2017/month=7""")
+ }
+
+ it should "construct with regex and path" in {
+
+ val regex = new Regex(
+ "hourly/(.+)_(.+)/(\\d{4})/(\\d{2})",
+ "datacenter", "table", "year", "month"
+ )
+
+ val baseLocation = "/path/to/external/tables"
+ val p = HivePartition(
+ database, baseLocation,
s"/path/to/raw/hourly/dc2_Test-Table2/2015/05", regex
+ )
+
+ p.tableName should equal(s"$database.Test_Table2")
+ p.path should
equal(s"$baseLocation/${p.table}/datacenter=dc2/year=2015/month=5")
+
+ val partitionShouldBe = ListMap(
+ "datacenter" -> "dc2", "year" -> "2015", "month" -> "05"
+ )
+ p.partitions should equal(partitionShouldBe)
+ }
+
+}
diff --git
a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestSparkSQLHiveExtensions.scala
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestSparkSQLHiveExtensions.scala
new file mode 100644
index 0000000..2450127
--- /dev/null
+++
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestSparkSQLHiveExtensions.scala
@@ -0,0 +1,381 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.apache.spark.sql.types._
+import org.scalatest.{Matchers, FlatSpec}
+
+import org.wikimedia.analytics.refinery.core.SparkSQLHiveExtensions._
+
+class TestSparkSQLHiveExtensions extends FlatSpec with Matchers {
+
+ val tableName = "test.table"
+ val tableLocation = "/tmp/test/table"
+
+
+ it should "build a Hive DDL string representing a simple column" in {
+ val field = StructField("f1", LongType, nullable = false)
+ val expected = s"`f1` bigint NOT NULL"
+
+ field.hiveColumnDDL should equal(expected)
+ }
+
+ it should "build Hive DDL string representing a complex column" in {
+ val field = StructField("S1", StructType(Seq(
+ StructField("s1", StringType, nullable=true)
+ )))
+ val expected = s"`S1` struct<`s1`:string>"
+
+ field.hiveColumnDDL should equal(expected)
+ }
+
+
+ it should "build a Hive DDL string representing multiple columns" in {
+ val fields = Seq(
+ StructField("f1", StringType, nullable = false),
+ StructField("f2", IntegerType, nullable = true),
+ StructField("S1", StructType(Seq(
+ StructField("s1", StringType, nullable=true)
+ )))
+ )
+ val expected =
+ s"""`f1` string NOT NULL,
+ |`f2` int,
+ |`S1` struct<`s1`:string>""".stripMargin
+
+ StructType(fields).hiveColumnsDDL() should equal(expected)
+ }
+
+ it should "merge and normalize 2 simple schemas" in {
+ val schema1 = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StringType, nullable = true)
+ ))
+
+ val schema2 = StructType(Seq(
+ StructField("f4", LongType, nullable = true),
+ StructField("f2", StringType, nullable = true),
+ StructField("f3", FloatType, nullable = true)
+
+ ))
+
+ // Unioned schema will be first ordred by schema1 fields, with any
extra fields
+ // from schema2 appended in the order they are found there.
+ val expected = StructType(Seq(
+ StructField("f1", LongType, nullable = true),
+ StructField("f2", StringType, nullable = true),
+ StructField("f4", LongType, nullable = true),
+ StructField("f3", DoubleType, nullable = true)
+ ))
+
+ schema1.merge(schema2) should equal(expected)
+ }
+
+
+ it should "merge and not normalize 2 simple schemas" in {
+ val schema1 = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StringType, nullable = true)
+ ))
+
+ val schema2 = StructType(Seq(
+ StructField("f4", LongType, nullable = true),
+ StructField("f2", StringType, nullable = true),
+ StructField("f3", IntegerType, nullable = true)
+
+ ))
+
+ // Unioned schema will be first ordred by schema1 fields, with any
extra fields
+ // from schema2 appended in the order they are found there.
+ val expected = StructType(Seq(
+ StructField("F1", LongType, nullable = true),
+ StructField("f2", StringType, nullable = true),
+ StructField("f4", LongType, nullable = true),
+ StructField("f3", LongType, nullable = true)
+ ))
+
+ schema1.merge(schema2, normalize=false) should equal(expected)
+ }
+
+ it should "merge and not normalize 2 complex schemas" in {
+ val schema1 = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s2", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true)
+ )))
+ )), nullable = true),
+ StructField("f3", StringType, nullable = true)
+ ))
+
+ val schema2 = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s3", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true),
+ StructField("c2", StringType, nullable=true)
+ )))
+ )), nullable = true),
+ StructField("F5", StructType(Seq(
+ StructField("b1", IntegerType, nullable=true))
+ )),
+ StructField("f4", LongType, nullable = true)
+ ))
+
+ // Merged schema will be first ordered by schema1 fields, with any
extra fields
+ // from schema2 appended in the order they are found there.
+ val expected = StructType(Seq(
+ StructField("F1", LongType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s2", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true),
+ StructField("c2", StringType, nullable=true)
+ ))),
+ StructField("s3", StringType, nullable = true)
+ )), nullable = true),
+ StructField("f3", StringType, nullable = true),
+ StructField("F5", StructType(Seq(
+ StructField("b1", LongType, nullable=true))
+ )),
+ StructField("f4", LongType, nullable = true)
+ ))
+
+ schema1.merge(schema2, normalize=false) should equal(expected)
+ }
+
+
+ it should "merge and normalize 2 complex schemas" in {
+ val schema1 = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s2", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true)
+ )))
+ )), nullable = true),
+ StructField("f3", StringType, nullable = true)
+ ))
+
+ val schema2 = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s3", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true),
+ StructField("c2", StringType, nullable=true)
+ )))
+ )), nullable = true),
+ StructField("F5", StructType(Seq(
+ StructField("b1", IntegerType, nullable=true))
+ )),
+ StructField("f4", LongType, nullable = true)
+ ))
+
+ // Merged schema will be first ordred by schema1 fields, with any
extra fields
+ // from schema2 appended in the order they are found there.
+ val expected = StructType(Seq(
+ StructField("f1", LongType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s2", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true),
+ StructField("c2", StringType, nullable=true)
+ ))),
+ StructField("s3", StringType, nullable = true)
+ )), nullable = true),
+ StructField("f3", StringType, nullable = true),
+ StructField("f5", StructType(Seq(
+ StructField("b1", LongType, nullable=true))
+ )),
+ StructField("f4", LongType, nullable = true)
+ ))
+
+ schema1.merge(schema2, normalize=true) should equal(expected)
+ }
+
+
+ it should "build non external no partitions create DDL with single schema"
in {
+ val schema = StructType(Seq(
+ StructField("f2", LongType, nullable = true),
+ StructField("f3", StringType, nullable = true),
+ StructField("F1", IntegerType, nullable = true)
+ ))
+
+ val expected =
+ s"""CREATE TABLE `$tableName` (
+ |`f2` bigint,
+ |`f3` string,
+ |`f1` bigint
+ |)
+ |-- No partition provided
+ |STORED AS PARQUET""".stripMargin
+
+ val statement = schema.hiveCreateDDL(tableName)
+
+ statement should equal(expected)
+ }
+
+ it should "build external no partitions create DDL with single schema" in {
+ val schema = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", LongType, nullable = true),
+ StructField("f3", StringType, nullable = true)
+ ))
+
+ val expected =
+ s"""CREATE EXTERNAL TABLE `$tableName` (
+ |`f1` bigint,
+ |`f2` bigint,
+ |`f3` string
+ |)
+ |-- No partition provided
+ |STORED AS PARQUET
+ |LOCATION '$tableLocation'""".stripMargin
+
+ val statement = schema.hiveCreateDDL(tableName, tableLocation)
+
+ statement should equal(expected)
+ }
+
+ it should "build external create DDL with partitions and single schema" in
{
+ val schema = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", LongType, nullable = true),
+ StructField("f4", StringType, nullable = true),
+ StructField("f3", StringType, nullable = true)
+ ))
+ val partitions = Seq("f3", "f4")
+
+ val expected =
+ s"""CREATE EXTERNAL TABLE `$tableName` (
+ |`f1` bigint,
+ |`f2` bigint
+ |)
+ |PARTITIONED BY (
+ |`f3` string,
+ |`f4` string
+ |)
+ |STORED AS PARQUET
+ |LOCATION '$tableLocation'""".stripMargin
+
+ val statement = schema.hiveCreateDDL(tableName, tableLocation,
partitions)
+
+ statement should equal(expected)
+ }
+
+
+ it should "build alter DDL with single schema" in {
+ val schema = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", LongType, nullable = true),
+ StructField("f3", StringType, nullable = true)
+ ))
+
+ val otherSchema = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f3", StringType, nullable = true),
+ StructField("f4", LongType, nullable = true)
+ ))
+
+ val partitions = Seq("f1", "f2")
+
+ val expected = Seq(
+ s"""ALTER TABLE `$tableName`
+ |ADD COLUMNS (
+ |`f4` bigint
+ |)""".stripMargin
+ )
+
+ val statements = schema.hiveAlterDDL(tableName, otherSchema)
+
+ statements should equal(expected)
+ }
+
+
+ it should "build alter DDL with just modified merged structs" in {
+ val schema = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s2", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true)
+ )))
+ )), nullable = true),
+ StructField("f3", StringType, nullable = true)
+ ))
+
+ val otherSchema = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s3", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true),
+ StructField("C2", StringType, nullable=true)
+ )))
+ )), nullable = true)
+ ))
+
+ val expected = Seq(
+ s"""ALTER TABLE `$tableName`
+ |CHANGE COLUMN `f2` `f2`
struct<`S1`:string,`s2`:string,`A1`:struct<`c1`:string,`C2`:string>,`s3`:string>""".stripMargin
+ )
+
+ val statements = schema.hiveAlterDDL(tableName, otherSchema)
+
+ statements should equal(expected)
+ }
+
+ it should "build alter DDL with new columns and modified merged structs"
in {
+ val schema = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s2", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true)
+ )))
+ )), nullable = true),
+ StructField("f3", StringType, nullable = true)
+ ))
+
+ val otherSchema = StructType(Seq(
+ StructField("F1", IntegerType, nullable = true),
+ StructField("f2", StructType(Seq(
+ StructField("S1", StringType, nullable = true),
+ StructField("s3", StringType, nullable = true),
+ StructField("A1", StructType(Seq(
+ StructField("c1", StringType, nullable=true),
+ StructField("c2", StringType, nullable=true)
+ )))
+ )), nullable = true),
+ StructField("f4", StructType(Seq(
+ StructField("b1", IntegerType, nullable=true))
+ )),
+ StructField("f5", LongType, nullable = true)
+ ))
+
+ val expected = Seq(
+ s"""ALTER TABLE `$tableName`
+ |ADD COLUMNS (
+ |`f4` struct<`b1`:bigint>,
+ |`f5` bigint
+ |)""".stripMargin,
+ s"""ALTER TABLE `$tableName`
+ |CHANGE COLUMN `f2` `f2`
struct<`S1`:string,`s2`:string,`A1`:struct<`c1`:string,`c2`:string>,`s3`:string>""".stripMargin
+ )
+
+ val statements = schema.hiveAlterDDL(tableName, otherSchema)
+
+ statements should equal(expected)
+ }
+
+}
diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index 248efba..2f0cf22 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -162,7 +162,6 @@
<version>1.6.0_0.4.7</version>
<scope>test</scope>
</dependency>
-
</dependencies>
<build>
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
new file mode 100644
index 0000000..f9abc66
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
@@ -0,0 +1,895 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.log4j.LogManager
+import scopt.OptionParser
+
+import scala.util.{Failure, Success, Try}
+import scala.util.matching.Regex
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
+
+import org.joda.time.Hours
+import org.joda.time.format.DateTimeFormatter
+import com.github.nscala_time.time.Imports._
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.hive.HiveContext
+
+import org.wikimedia.analytics.refinery.core.{HivePartition, SparkJsonToHive,
Utilities}
+
+
+// TODO: ERROR Hive: Table otto not found: default.otto table not found ???
+// TODO: support append vs overwrite?
+// TODO: Hive Table Locking?
+
+/**
+ * Looks for hourly input partition directories with JSON data that need
refinement,
+ * and refines them into Hive Parquet tables using SparkJsonToHive.
+ */
+object JsonRefine {
+
+ private val log = LogManager.getLogger("JsonRefine")
+ private val iso8601DateFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
+ /**
+ * Config class for CLI argument parser using scopt
+ */
+ case class Params(
+ inputBasePath: String = "",
+ outputBasePath: String = "",
+ databaseName: String = "default",
+ sinceDateTime: DateTime = DateTime.now - 192.hours,
// 8 days ago
+ untilDateTime: DateTime = DateTime.now,
+ inputPathPattern: String =
".*/(.+)/hourly/(\\d{4})/(\\d{2})/(\\d{2})/(\\d{2}).*",
+ inputPathPatternCaptureGroups: Seq[String] = Seq("table", "year",
"month", "day", "hour"),
+ inputPathDateTimeFormat: DateTimeFormatter =
DateTimeFormat.forPattern("'hourly'/yyyy/MM/dd/HH"),
+ tableWhitelistRegex: Option[Regex] = None,
+ tableBlacklistRegex: Option[Regex] = None,
+ doneFlag: String = "_REFINED",
+ parallelism: Option[Int] = None,
+ compressionCodec: String = "snappy",
+ isSequenceFile: Boolean = true,
+ limit: Option[Int] = None,
+ dryRun: Boolean = false,
+ shouldEmailReport: Boolean = false,
+ smtpURI: String = "mx1001.wikimedia.org:25",
+ fromEmail: String =
s"jsonrefine@${java.net.InetAddress.getLocalHost.getCanonicalHostName}",
+ toEmails: Seq[String] =
Seq("[email protected]")
+ )
+
+
+ // Support implicit Regex conversion from CLI opt.
+ implicit val scoptRegexRead: scopt.Read[Regex] =
+ scopt.Read.reads { _.r }
+
+ // Support implicit Option[Regex] conversion from CLI opt.
+ implicit val scoptOptionRegexRead: scopt.Read[Option[Regex]] =
+ scopt.Read.reads {s => Some(s.r) }
+
+ // Support implicit DateTimeFormatter conversion from CLI opt.
+ implicit val scoptDateTimeFormatterRead: scopt.Read[DateTimeFormatter] =
+ scopt.Read.reads { s => DateTimeFormat.forPattern(s) }
+
+ // Support implicit DateTime conversion from CLI opt.
+ // The opt can either be given in integer hours ago, or
+ // as an ISO-8601 date time.
+ implicit val scoptDateTimeRead: scopt.Read[DateTime] =
+ scopt.Read.reads { s => {
+ if (s.forall(Character.isDigit))
+ DateTime.now - s.toInt.hours
+ else
+ DateTime.parse(s, iso8601DateFormatter)
+ }
+ }
+
+
+
+ /**
+ * Define the command line options parser.
+ */
+ val argsParser = new OptionParser[Params](
+ "spark-submit --class org.wikimedia.analytics.refinery.job.JsonRefine
refinery-job.jar"
+ ) {
+ head("""
+ |JSON Datasets -> Partitioned Hive Parquet tables.
+ |
+ |Given an input base path, this will search all subdirectories
for input
+ |partitions to convert to Parquet backed Hive tables. This was
originally
+ |written to work with JSON data imported via Camus into hourly
buckets, but
+ |should be configurable to work with any regular import
directory hierarchy.
+ |
+ |Example:
+ | spark-submit --class
org.wikimedia.analytics.refinery.job.JsonRefine refinery-job.jar \
+ | --input-base-path /wmf/data/raw/event \
+ | --output-base-path /user/otto/external/eventbus5' \
+ | --database event \
+ | --since 24 \
+ | --input-regex
'.*(eqiad|codfw)_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' \
+ | --input-capture 'datacenter,table,year,month,day,hour'
\
+ | --table-blacklist '.*page_properties_change.*'
+ |
+ |""".stripMargin, "")
+
+ note("""NOTE: You may pass all of the described CLI options to this
job in a single
+ |string with --options '<options>' flag.\n""".stripMargin)
+
+ help("help") text "Prints this usage text."
+
+ opt[String]('i',
"input-base-path").required().valueName("<path>").action { (x, p) =>
+ p.copy(inputBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+ } text
+ """Path to input JSON datasets. This directory is expected to
contain
+ |directories of individual (topic) table datasets. E.g.
+ |/path/to/raw/data/{myprefix_dataSetOne,myprefix_dataSetTwo},
etc.
+ |Each of these subdirectories will be searched for partitions
that
+ |need to be refined.""".stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[String]('o', "output-base-path") optional() valueName "<path>"
action { (x, p) =>
+ p.copy(outputBasePath = if (x.endsWith("/")) x.dropRight(1) else x)
+ } text
+ """Base path of output data and of external Hive tables. Each
table will be created
+ |with a LOCATION in a subdirectory of this path."""
+ .stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[String]('d', "database") optional() valueName "<database>" action
{ (x, p) =>
+ p.copy(databaseName = if (x.endsWith("/")) x.dropRight(1) else x)
+ } text "Hive database name in which to manage refined Hive tables.\n"
+
+ opt[DateTime]('s', "since") optional() valueName "<since-date-time>"
action { (x, p) =>
+ p.copy(sinceDateTime = x)
+ } text
+ """Refine all data found since this date time. This may either be
given as an integer
+ |number of hours ago, or an ISO-8601 formatted date time.
Default: 192 hours ago."""
+ .stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[DateTime]('u', "until") optional() valueName "<until-date-time>"
action { (x, p) =>
+ p.copy(untilDateTime = x)
+ } text
+ """Refine all data found until this date time. This may either be
given as an integer
+ |number of hours ago, or an ISO-8601 formatted date time.
Default: now."""
+ .stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[String]('R', "input-regex") optional() valueName "<regex>" action
{ (x, p) =>
+ p.copy(inputPathPattern = x)
+ } text
+ """input-regex should match the input partition directory
hierarchy starting from the
+ |dataset base path, and should capture the table name and the
partition values.
+ |Along with input-capture, this allows arbitrary extraction of
table names and and
+ |partitions from the input path. You are required to capture at
least "table"
+ |using this regex. The default will match an hourly bucketed
Camus import hierarchy,
+ |using the topic name as the table
name.""".stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[String]('C', "input-capture") optional() valueName
"<capture-list>" action { (x, p) =>
+ p.copy(inputPathPatternCaptureGroups = x.split(","))
+ } text
+ """input-capture should be a comma separated list of named capture
groups
+ |corresponding to the groups captured byt input-regex. These
need to be
+ |provided in the order that the groups are captured. This
ordering will
+ |also be used for partitioning.""".stripMargin.replace("\n",
"\n\t") + "\n"
+
+ opt[DateTimeFormatter]('F', "input-datetime-format") optional()
valueName "<format>" action { (x, p) =>
+ p.copy(inputPathDateTimeFormat = x)
+ } text
+ """This DateTimeFormat will be used to generate all possible
partitions since
+ |the given lookback-hours in each dataset directory. This
format will be used
+ |to format a DateTime to input directory partition paths. The
finest granularity
+ |supported is hourly. Every hour in the past lookback-hours
will be generated,
+ |but if you specify a less granular format (e.g. daily, like
"daily"/yyyy/MM/dd),
+ |the code will reduce the generated partition search for that
day to 1, instead of 24.
+ |The default is suitable for generating partitions in an hourly
bucketed Camus
+ |import hierarchy.
+ """.stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[(Option[Regex])]('w', "table-whitelist") optional() valueName
"<regex>" action { (x, p) =>
+ p.copy(tableWhitelistRegex = x)
+ } text "Whitelist regex of table names to refine.\n".stripMargin
+
+ opt[Option[Regex]]('b', "table-blacklist") optional() valueName
"<regex>" action { (x, p) =>
+ p.copy(tableBlacklistRegex = x)
+ } text "Blacklist regex of table names to skip.\n".stripMargin
+
+ opt[String]('D', "done-flag") optional() valueName "<filename>" action
{ (x, p) =>
+ p.copy(doneFlag = x)
+ } text
+ """When a partition is successfully refined, this file will be
created in the
+ |output partition path with the binary timestamp of the input
source partition's
+ |modification timestamp. This allows subsequent runs that to
detect if the input
+ |data has changed meaning the partition needs to be re-refined.
+ |Default: _REFINED""".stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[Int]('P', "parallelism") optional() valueName "<parallelism>"
action { (x, p) =>
+ p.copy(parallelism = Some(x))
+ } text
+ """Refine into up to this many tables in parallel. Individual
partitions
+ |destined for the same Hive table will be refined serially.
+ |Defaults to the number of local CPUs (i.e. what Scala parallel
+ |collections uses).""".stripMargin.replace("\n", "\n\t") + "\n"
+
+
+ opt[String]('c', "compression-codec") optional() valueName "<codec>"
action { (x, p) =>
+ p.copy(compressionCodec = x)
+ } text "Value of spark.sql.parquet.compression.codec, default:
snappy\n"
+
+ opt[Boolean]('S', "sequence-file") optional() action { (x, p) =>
+ p.copy(isSequenceFile = x)
+ } text
+ """Set to true if the input data is stored in Hadoop Sequence
files.
+ |Otherwise text is assumed. Default: true"""
+ .stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[String]('L', "limit") optional() valueName "<limit>" action { (x,
p) =>
+ p.copy(limit = Some(x.toInt))
+ } text
+ """Only refine this many partitions directories. This is useful
while
+ |testing to reduce the number of refinements to do at once.
Defaults
+ |to no limit.""".stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[Unit]('n', "dry-run") optional() action { (_, p) =>
+ p.copy(dryRun = true)
+ } text
+ """Set to true if no action should actually be taken. Instead,
targets
+ |to refine will be printed, but they will not be refined.
+ |Default: false"""
+ .stripMargin.replace("\n", "\n\t") + "\n"
+
+ opt[Unit]('E', "send-email-report") optional() action { (_, p) =>
+ p.copy(shouldEmailReport = true)
+ } text
+ "Set this flag if you want an email report of any failures during
refinement."
+
+ opt[String]('T', "smtp-uri") optional() valueName "<smtp-uri>" action
{ (x, p) =>
+ p.copy(smtpURI = x)
+ } text "SMTP server host:port. Default: mx1001.wikimedia.org"
+
+ opt[String]('f', "from-email") optional() valueName "<from-email>"
action { (x, p) =>
+ p.copy(fromEmail = x)
+ } text "Email report from sender email address."
+
+ opt[String]('t', "to-emails") optional() valueName "<to-emails>"
action { (x, p) =>
+ p.copy(toEmails = x.split(","))
+ } text
+ "Email report recipient email addresses (comma separated).
Default: [email protected]"
+
+ }
+
+ def main(args: Array[String]): Unit = {
+ val params = args.headOption match {
+ // Case when our job options are given as a single string. Split
them
+ // and pass them to argsParser.
+ case Some("--options") =>
+ argsParser.parse(args(1).split("\\s+"),
Params()).getOrElse(sys.exit(1))
+ // Else the normal usage, each CLI opts can be parsed as a job
option.
+ case _ =>
+ argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+ }
+
+ // Exit non-zero if if any refinements failed.
+ if (apply(params))
+ sys.exit(0)
+ else
+ sys.exit(1)
+ }
+
+
+ /**
+ * Given params, refine all discovered JsonTargets.
+ *
+ * @param params Params
+ * @return true if all targets needing refinement succeeded, false
otherwise.
+ */
+ def apply(params: Params): Boolean = {
+ // Initial setup - Spark, HiveContext, Hadoop FileSystem
+ val conf = new SparkConf()
+ val sc = new SparkContext(conf)
+
+ val fs = FileSystem.get(sc.hadoopConfiguration)
+ val hiveContext = new HiveContext(sc)
+ hiveContext.setConf("spark.sql.parquet.compression.codec",
params.compressionCodec)
+
+ // Ensure that inputPathPatternCaptureGroups contains "table", as this
is needed
+ // to determine the Hive table name we will refine into.
+ if (!params.inputPathPatternCaptureGroups.contains("table")) {
+ throw new RuntimeException(
+ s"Invalid <input-capture>
${params.inputPathPatternCaptureGroups}. " +
+ s"Must at least contain 'table' as a named capture group."
+ )
+ }
+
+ // Combine the inputPathPattern with the capture groups to build a
regex that
+ // will use aliases for the named groups. This will be used to extract
+ // table and partitions out of the inputPath.
+ val inputPathRegex = new Regex(
+ params.inputPathPattern,
+ params.inputPathPatternCaptureGroups: _*
+ )
+
+
+ log.info(
+ s"Looking for JSON targets to refine in ${params.inputBasePath}
between " +
+ s"${params.sinceDateTime} and ${params.untilDateTime}"
+ )
+
+ // Need JsonTargets for every existent input partition since
pastCutoffDateTime
+ val targetsToRefine = jsonTargetsSince(
+ fs,
+ new Path(params.inputBasePath),
+ params.isSequenceFile,
+ new Path(params.outputBasePath),
+ params.databaseName,
+ params.doneFlag,
+ params.inputPathDateTimeFormat,
+ inputPathRegex,
+ params.sinceDateTime
+ )
+ // Filter for tables in whitelist, filter out tables in blacklist,
+ // and filter the remaining for targets that need refinement.
+ .filter(target => shouldRefineJsonTarget(
+ target, params.tableWhitelistRegex, params.tableBlacklistRegex
+ ))
+
+ // At this point, targetsToRefine will be a Seq of JsonTargets in our
targeted
+ // time range that need refinement, either because they haven't yet
been refined,
+ // or the input data has changed since the previous refinement.
+
+ // Return now if we didn't find any targets to refine.
+ if (targetsToRefine.isEmpty) {
+ log.info(s"No targets needing refinement were found in
${params.inputBasePath}")
+ return true
+ }
+
+ // Locally parallelize the targets.
+ // If params.limit, then take only the first limit input targets.
+ // This is mainly only useful for testing.
+ val targets = params.limit match {
+ case Some(_) => targetsToRefine.take(params.limit.get).par
+ case None => targetsToRefine.par
+ }
+
+ // If custom parallelism was specified, create a new ForkJoinPool for
this
+ // parallel collection with the provided parallelism level.
+ if (params.parallelism.isDefined) {
+ targets.tasksupport = new ForkJoinTaskSupport(
+ new ForkJoinPool(params.parallelism.get)
+ )
+ }
+
+ val targetsByTable = targets.groupBy(_.tableName)
+
+ log.info(
+ s"Refining ${targets.length} JSON dataset partitions in into
tables " +
+ s"${targetsByTable.keys.mkString(", ")} with local " +
+ s"parallelism of ${targets.tasksupport.parallelismLevel}..."
+ )
+
+ if (params.dryRun)
+ log.warn("NOTE: --dry-run was specified, nothing will actually be
refined!")
+
+ // Loop over the inputs in parallel and refine them to
+ // their Hive table partitions. jobStatuses should be a
+ // iterable of Trys as Success/Failures.
+ val jobStatusesByTable = targetsByTable.map { case (table,
tableTargets) => {
+ // We need tableTargets to run in serial instead of parallel.
When a table does
+ // not yet exist, we want the first target here to issue a CREATE,
while the
+ // next one to use the created table, or ALTER it if necessary.
We don't
+ // want multiple CREATEs for the same table to happen in parallel.
+ if (!params.dryRun)
+ table -> refineJsonTargets(hiveContext, tableTargets.seq)
+ // If --dry-run was given, don't refine, just map to Successes.
+ else
+ table -> tableTargets.seq.map(Success(_))
+ }}
+
+ // Log successes and failures.
+ val successesByTable = jobStatusesByTable.map(t => t._1 ->
t._2.filter(_.isSuccess))
+ val failuresByTable = jobStatusesByTable.map(t => t._1 ->
t._2.filter(_.isFailure))
+
+ var hasFailures = false
+ if (successesByTable.nonEmpty) {
+ for ((table, successes) <- successesByTable.filter(_._2.nonEmpty))
{
+ val totalRefinedRecordCount =
targetsByTable(table).map(_.recordCount).sum
+ log.info(
+ s"Successfully refined ${successes.length} of
${targetsByTable(table).size} " +
+ s"raw JSON dataset partitions into table $table (total #
refined records: $totalRefinedRecordCount)"
+ )
+ }
+ }
+
+ // Collect a string of failures that we might email as a report later.
+ var failureMessages = ""
+ if (failuresByTable.nonEmpty) {
+
+ for ((table, failures) <- failuresByTable.filter(_._2.nonEmpty)) {
+ // Log each failed refinement.
+ val message =
+ s"The following ${failures.length} of
${targetsByTable(table).size} " +
+ s"raw JSON dataset partitions for table $table failed
refinement:\n\t" +
+ failures.mkString("\n\t")
+
+ log.error(message)
+ failureMessages += "\n\n" + message
+
+ hasFailures = true
+ }
+ }
+
+ // If we should send this as a failure email report
+ // (and this is not a dry run), do it!
+ if (hasFailures && params.shouldEmailReport && !params.dryRun) {
+ val smtpHost = params.smtpURI.split(":")(0)
+ val smtpPort = params.smtpURI.split(":")(1)
+
+ log.info(s"Sending failure email report to
${params.toEmails.mkString(",")}")
+ Utilities.sendEmail(
+ smtpHost,
+ smtpPort,
+ params.fromEmail,
+ params.toEmails.toArray,
+ s"JsonRefine failure report for ${params.inputBasePath} ->
${params.outputBasePath}",
+ failureMessages
+ )
+ }
+
+ // Return true if no failures, false otherwise.
+ !hasFailures
+ }
+
+
+ /**
+ * Given a JsonTarget, and option whitelist regex and blacklist regex,
+ * this returns true if the JsonTarget should be refined, based on regex
matching and
+ * on output existence and doneFlag content.
+ *
+ * @param target JsonTarget
+ * @param tableWhitelistRegex Option[Regex]
+ * @param tableBlacklistRegex Option[Regex]
+ * @return
+ */
+ def shouldRefineJsonTarget(
+ target: JsonTarget,
+ tableWhitelistRegex: Option[Regex],
+ tableBlacklistRegex: Option[Regex]
+ ): Boolean = {
+
+ // Filter for targets that will refine to tables that match the
whitelist
+ if (tableWhitelistRegex.isDefined &&
+ !regexMatches(target.partition.table, tableWhitelistRegex.get)
+ ) {
+ log.info(
+ s"$target table ${target.partition.table} does not match table
whitelist regex " +
+ s"${tableWhitelistRegex.get}', skipping."
+ )
+ false
+ }
+ // Filter out targets that will refine to tables that match the
blacklist
+ else if (tableBlacklistRegex.isDefined &&
+ regexMatches(target.partition.table, tableBlacklistRegex.get)
+ ) {
+ log.info(
+ s"$target table ${target.partition.table} matches table
blacklist regex " +
+ s"'${tableBlacklistRegex.get}', skipping."
+ )
+ false
+ }
+ // Finally filter for those that need to be refined (have new data).
+ else if (!target.shouldRefine) {
+ log.info(
+ s"$target does not have new data since the last refine at " +
+ s"${target.doneFlagMTime().getOrElse("_unknown_")}, skipping."
+ )
+ false
+ }
+ else {
+ true
+ }
+ }
+
+
+ /**
+ * Given a Seq of JsonTargets, this runs SparkJsonToHive on each one.
+ *
+ * @param hiveContext HiveContext
+ * @param targets Seq of JsonTargets to refine
+ * @return
+ */
+ def refineJsonTargets(
+ hiveContext: HiveContext,
+ targets: Seq[JsonTarget]
+ ): Seq[Try[JsonTarget]] = {
+ targets.map(target => {
+ log.info(s"Beginning refinement of $target...")
+
+ try {
+ val recordCount = SparkJsonToHive(
+ hiveContext,
+ target.inputPath.toString,
+ target.partition,
+ target.inputIsSequenceFile,
+ () => target.writeDoneFlag()
+ )
+
+ log.info(
+ s"Finished refinement of JSON dataset $target. " +
+ s"(# refined records: $recordCount)"
+ )
+
+ target.success(recordCount)
+
+ }
+ catch {
+ case e: Exception => {
+ log.error(s"Failed refinement of JSON dataset $target.", e)
+ target.failure(e)
+ }
+ }
+ })
+ }
+
+
+ /**
+ * Finds JsonTargets with existent input partition paths between
sinceDateTime and untilDateTime.
+ * The table and partitions are extracted from inputPath by combining
inputPathDateTimeFormatter
+ * and inputPathRegex.
+ *
+ * inputPathDateTimeFormatter will be used to construct the expected
inputPath for each
+ * input partition directory between sinceDateTime and untilDateTime.
E.g. a formatter
+ * with a format of "'hourly'/yyyy/MM/dd/HH" will look for existent
inputPaths
+ * for every hour in the provided time period, like
+ * $baseInputPath/subdir1/hourly/2017/07/26/00,
+ * $baseInputPath/subdir1/hourly/2017/07/26/01,
+ * $baseInputPath/subdir2/hourly/2017/07/26/00,
+ * $baseInputPath/subdir2/hourly/2017/07/26/01,
+ * etc.
+ *
+ * inputPathRegex is expected to capture named groups that include
"table" and any other
+ * partition keys. inputPathRegex's capture groups must contain one
named "table".
+ * E.g. new Regex(
+ * "(eqiad|codfw)_(.+)/hourly/\\d{4}/\\d{2}/\\d{2}/\\d{2}",
+ * "datacenter", "table", "year", "month", "day", "hour"
+ *
+ * and an inputPath of
+ * $baseInputPath/eqiad_mediawiki_revision-create/2017/07/26/01
+ *
+ * Will construct a JsonTarget with table "mediawiki_revision_create"
(hyphens are converted
+ * to underscores) and partitions
datacenter="eqiad",year=2017,month=07,day=26,hour=01
+ *
+ *
+ * @param fs Hadoop FileSystem
+ *
+ * @param baseInputPath Path to base input datasets. Each
subdirectory
+ * is assumed to be a unique dataset
with individual
+ * partitions. Every subdirectory's
partition
+ * paths here must be compatible with
the provided
+ * values of
inputPathDateTimeFormatter and inputPathRegex.
+ *
+ * @param inputIsSequenceFile Should be True if the input data
files are Hadoop
+ * Sequence Files.
+ *
+ * @param baseTableLocationPath Path to directory where Hive table
data will be stored.
+ * $baseTableLocationPath/$table will
be the value of the
+ * external Hive table's LOCATION.
+ *
+ * @param databaseName Hive database name
+ *
+ * @param doneFlag Done flag file. A successful
refinement will
+ * write this file to the output path
with
+ * the Long timestamp of the
inputPath's current mod time.
+ *
+ * @param inputPathDateTimeFormatter Formatter used to construct input
partition paths
+ * in the given time range.
+ *
+ * @param inputPathRegex Regex used to extract table name
and partition
+ * information.
+ *
+ * @param sinceDateTime Start date time to look for input
partitions.
+ *
+ * @param untilDateTime End date time to look for input
partitions.
+ * Defaults to DateTime.now
+ * @return
+ */
+ def jsonTargetsSince(
+ fs: FileSystem,
+ baseInputPath: Path,
+ inputIsSequenceFile: Boolean,
+ baseTableLocationPath: Path,
+ databaseName: String,
+ doneFlag: String,
+ inputPathDateTimeFormatter: DateTimeFormatter,
+ inputPathRegex: Regex,
+ sinceDateTime: DateTime,
+ untilDateTime: DateTime = DateTime.now
+ ): Seq[JsonTarget] = {
+ val inputDatasetPaths = subdirectoryPaths(fs, baseInputPath)
+
+ // Map all partitions in each inputPaths since pastCutoffDateTime to
JsonTargets
+ inputDatasetPaths.flatMap { inputDatasetPath =>
+ // Get all possible input partition paths for all directories in
inputDatasetPath
+ // between sinceDateTime and untilDateTime.
+ // This will include all possible partition paths in that time
range, even if that path
+ // does not actually exist.
+ val pastPartitionPaths = partitionPathsSince(
+ inputDatasetPath.toString,
+ inputPathDateTimeFormatter,
+ sinceDateTime,
+ untilDateTime
+ )
+
+ // Convert each possible partition input path into a possible
JsonTarget for refinement.
+ pastPartitionPaths.map(partitionPath => {
+ // Any capturedKeys other than table are expected to be
partition key=values.
+ val partition = HivePartition(
+ databaseName,
+ baseTableLocationPath.toString,
+ partitionPath.toString,
+ inputPathRegex
+ )
+
+ JsonTarget(
+ fs,
+ partitionPath,
+ inputIsSequenceFile,
+ partition,
+ doneFlag
+ )
+ })
+ // We only care about input partition paths that actually exist,
+ // so filter out those that don't.
+ .filter(_.inputExists())
+ }
+ }
+
+
+ /**
+ * Returns true of s matches r, else false.
+ * @param s String to match
+ * @param r Regex
+ * @return
+ */
+ def regexMatches(s: String, r: Regex): Boolean = {
+ s match {
+ case r(_*) => true
+ case _ => false
+ }
+ }
+
+
+ /**
+ * Retruns a Seq of all directory Paths in a directory.
+ * @param fs Hadoop FileSystem
+ * @param inDirectory directory Path in which to look for subdirectories
+ * @return
+ */
+ def subdirectoryPaths(fs: FileSystem, inDirectory: Path): Seq[Path] = {
+ fs.listStatus(inDirectory).filter(_.isDirectory).map(_.getPath)
+ }
+
+
+ /**
+ * Given 2 DateTimes, this generates a Seq of DateTimes representing all
hours
+ * between since d1 (inclusive) and d2 (exclusive). E.g.
+ * DateTime.now -> 2017-08-10T21:42:32.820Z
+ *
+ * hoursInBetween(DateTime.now - 2.hours, DateTime.now) ->
+ * Seq(2017-08-10T19:00:00.000Z, 2017-08-10T20:00:00.000Z)
+ *
+ * In the above example, the current hour is 21, and this function returns
+ * the previous two hours.
+ *
+ * @param d1 sinceDateTime
+ * @param d2 untilDateTime
+ * @return
+ */
+ def hoursInBetween(d1: DateTime, d2: DateTime): Seq[DateTime] = {
+ val oldestHour = new DateTime(d1,
DateTimeZone.UTC).hourOfDay.roundCeilingCopy
+ val youngestHour = new DateTime(d2,
DateTimeZone.UTC).hourOfDay.roundFloorCopy
+
+ for (h <- 0 to Hours.hoursBetween(oldestHour, youngestHour).getHours)
yield {
+ oldestHour + h.hours - 1.hours
+ }
+ }
+
+
+ /**
+ * Given a DateTimeFormatter and 2 DateTimes, this will generate
+ * a Seq of Paths for every distinct result of fmt.print(hour) prefixed
+ * witih pathPrefix. If your date formatter generates the same
+ * path for multiple hours, only one of those paths will be included
+ * in the result. This way, you can still generate a list more granular
partitions, if
+ * your data happens to be partitioned at a more granular time bucketing
than hourly.
+ *
+ * @param pathPrefix Prefix to prepend to every generated partition path
+ * @param fmt Date formatter to use to extract partition paths
from hours
+ * between d1 and d2
+ * @param d1 sinceDateTime
+ * @param d2 untilDateTime, Defaults to DateTime.now
+ * @return
+ */
+ def partitionPathsSince(
+ pathPrefix: String,
+ fmt: DateTimeFormatter,
+ d1: DateTime,
+ d2: DateTime = DateTime.now
+ ): Seq[Path] = {
+ hoursInBetween(d1, d2)
+ .map(hour => new Path(pathPrefix + "/" + fmt.print(hour)))
+ .distinct
+ }
+
+
+ /**
+ * Represents a JSON dataset target for refinement. This mainly exists
to reduce the number
+ * of parameters we have to pass around between functions here. An
instantiated JsonTarget
+ * should contain enough information to use SparkJsonToHive to refine a
single Json partition
+ * into a Hive Parquet table.
+ *
+ * @param fs Hadoop FileSystem
+ * @param inputPath Full input partition path
+ * @param inputIsSequenceFile If the input is a Hadoop Sequence File
+ * @param partition HivePartition
+ * @param doneFlag Name of file that should be written upon
success of
+ * SparkJsonToHive run. This can be created
by calling
+ * the writeDoneFlag method.
+ */
+ case class JsonTarget(
+ fs: FileSystem,
+ inputPath: Path,
+ inputIsSequenceFile: Boolean,
+ partition: HivePartition,
+ doneFlag: String
+ ) {
+ /**
+ * Easy access to the fully qualified Hive table name.
+ */
+ val tableName: String = partition.tableName
+
+ /**
+ * Easy access to the hive partition path, AKA the output destination
path
+ */
+ val outputPath = new Path(partition.path)
+
+ /**
+ * Path to doneFlag in hive table partition output path
+ */
+ val doneFlagPath = new Path(s"$outputPath/$doneFlag")
+
+ /**
+ * Number of records successfully refined for this JsonTarget.
+ */
+ var recordCount: Long = -1
+
+ /**
+ * The mtime of the inputPath at the time this JsonTarget is
instantiated.
+ * caching this allows us to use the earliest mtime possible to store
in doneFlag,
+ * in case the inputPath changes while this target is being refined.
+ */
+ private val inputMTimeCached: Option[DateTime] = inputMTime()
+
+ /**
+ * True if the inputPath exists
+ * @return
+ */
+ def inputExists(): Boolean = fs.exists(inputPath)
+
+ /**
+ * True if the outputPath exists
+ * @return
+ */
+ def outputExists(): Boolean = fs.exists(outputPath)
+
+ /**
+ * True if the outputPath/doneFlag exists
+ * @return
+ */
+ def doneFlagExists(): Boolean = fs.exists(doneFlagPath)
+
+ /**
+ * Returns the mtime Long timestamp of inputPath. inputPath's
+ * mtime will change if it or any of its direct files change.
+ * It will not change if a content in a subdirectory changes.
+ * @return
+ */
+ def inputMTime(): Option[DateTime] = {
+ if (inputExists()) {
+ Some(new
DateTime(fs.getFileStatus(inputPath).getModificationTime))
+ }
+ else
+ None
+ }
+
+ /**
+ * Reads the Long timestamp out of the outputPath/doneFlag if it
exists.
+ * @return
+ */
+ def doneFlagMTime(): Option[DateTime] = {
+ if (doneFlagExists()) {
+ val inStream = fs.open(doneFlagPath)
+ val mtime = new DateTime(inStream.readUTF())
+ inStream.close()
+ Some(mtime)
+ }
+ else
+ None
+ }
+
+ /**
+ * Write out doneFlag file for this output target partition
+ *
+ * This saves the modification timestamp of the inputPath as it when
this target was
+ * instantiated. This will allow later comparison of the contents of
doneFlag with the
+ * inputPath modification time. If they are different, the user might
decide to rerun
+ * SparkJsonToHive for this target, perhaps assuming that there is new
+ * data in inputPath. Note that inputPath directory mod time only
changes if
+ * its direct content changes, it will not change if something in a
subdirectory
+ * below it changes.
+ */
+ def writeDoneFlag(): Unit = {
+
+ val mtime = inputMTimeCached.getOrElse(
+ throw new RuntimeException(
+ s"Cannot write done flag, input mod time was not obtained
when $this was " +
+ s"instantiated, probably because it did not exist.
This should not happen"
+ )
+ )
+
+ log.info(
+ s"Writing done flag file at $doneFlagPath with $inputPath last
modification " +
+ s"timestamp of $mtime"
+ )
+
+ val outStream = fs.create(doneFlagPath)
+ outStream.writeUTF(mtime.toString)
+ outStream.close()
+ }
+
+ /**
+ * This target needs refined if:
+ * - The output doesn't exist OR
+ * - The output doneFlag doesn't exist OR
+ * - The input's mtime does not equal the timestamp in the output
doneFlag file,
+ * meaning that something has changed in the inputPath since the
last time doneFlag
+ * was written.
+ *
+ * @return
+ */
+ def shouldRefine(): Boolean = {
+ !outputExists() || !doneFlagExists() || inputMTimeCached !=
doneFlagMTime()
+ }
+
+ /**
+ * Returns a Failure with e wrapped in a new more descriptive
Exception
+ * @param e Original exception that caused this failure
+ * @return
+ */
+ def failure(e: Exception): Try[JsonTarget] = {
+ Failure(JsonTargetException(
+ this, s"Failed refinement of JSON dataset $this. Original
exception: $e", e
+ ))
+ }
+
+ /**
+ * Returns Success(this) of this JsonTarget
+ * @return
+ */
+ def success(recordCount: Long): Try[JsonTarget] = {
+ this.recordCount = recordCount
+ Success(this)
+ }
+
+ override def toString: String = {
+ s"$inputPath -> $partition"
+ }
+ }
+
+
+ /**
+ * Exception wrapper used to retrieve the JsonTarget instance from a
Failure instance.
+ * @param target JsonTarget
+ * @param message exception message
+ * @param cause Original Exception
+ */
+ case class JsonTargetException(
+ target: JsonTarget,
+ message: String = "",
+ cause: Throwable = None.orNull
+ ) extends Exception(message, cause) { }
+
+}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestJsonRefine.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestJsonRefine.scala
new file mode 100644
index 0000000..61bebc8
--- /dev/null
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestJsonRefine.scala
@@ -0,0 +1,22 @@
+package org.wikimedia.analytics.refinery.job
+
+import com.github.nscala_time.time.Imports.DateTime
+import com.github.nscala_time.time.Imports._
+import org.scalatest.{FlatSpec, Matchers}
+
+class TestJsonRefine extends FlatSpec with Matchers {
+
+ it should "hoursInBetween gives hours in between two DateTimes" in {
+ val d1 = DateTime.now - 3.hours
+ val d2 = DateTime.now
+
+ val hoursShouldBe = Seq(
+ (new DateTime(DateTime.now, DateTimeZone.UTC) -
3.hours).hourOfDay.roundFloorCopy,
+ (new DateTime(DateTime.now, DateTimeZone.UTC) -
2.hours).hourOfDay.roundFloorCopy,
+ (new DateTime(DateTime.now, DateTimeZone.UTC) -
1.hours).hourOfDay.roundFloorCopy
+ )
+
+ val hours = JsonRefine.hoursInBetween(d1, d2)
+ hours should equal (hoursShouldBe)
+ }
+}
\ No newline at end of file
--
To view, visit https://gerrit.wikimedia.org/r/346291
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Ieb2c3a99501623d71fa58ac7dfb6734cb809096f
Gerrit-PatchSet: 42
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits