jenkins-bot has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/386882 )
Change subject: Add core class and job to import EL hive tables to Druid
......................................................................
Add core class and job to import EL hive tables to Druid
Created resources directory to store master-ingestion-spec
Bug: T166414
Change-Id: I30c374c3dcba44bbd0608e72ae0162bcc442cd0f
---
M pom.xml
M refinery-core/pom.xml
A refinery-core/src/main/resources/ingestion_spec_template.json
A
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala
A
refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala
M refinery-job/pom.xml
A
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala
A
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala
8 files changed, 1,083 insertions(+), 14 deletions(-)
Approvals:
jenkins-bot: Verified
Nuria: Looks good to me, approved
diff --git a/pom.xml b/pom.xml
index dcaa0fd..68ce8da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,7 +190,21 @@
<scope>test</scope>
</dependency>
- <dependency>
+
+ <!--
+ adding explicit dep for snappy, otherwise
+ spark assumes is on the java.library.path
+ see:
https://github.com/rvs/snappy-java/blob/master/src/main/resources/org/xerial/snappy/SnappyNativeLoader.java#L47
+ -->
+ <!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java -->
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.2.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.github.nscala-time</groupId>
<artifactId>nscala-time_2.10</artifactId>
<version>2.0.0</version>
diff --git a/refinery-core/pom.xml b/refinery-core/pom.xml
index 8ace48a..5e6d6f7 100644
--- a/refinery-core/pom.xml
+++ b/refinery-core/pom.xml
@@ -76,6 +76,13 @@
</dependency>
<dependency>
+ <groupId>org.scalamock</groupId>
+ <artifactId>scalamock-scalatest-support_2.10</artifactId>
+ <version>3.2.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
@@ -119,7 +126,32 @@
<version>1.4.7</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <!-- javax.servlet in hadoop-common is older than the one
in spark -->
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.holdenkarau</groupId>
+ <artifactId>spark-testing-base_2.10</artifactId>
+ <version>1.6.0_0.4.7</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.3</version>
+ </dependency>
</dependencies>
<build>
diff --git a/refinery-core/src/main/resources/ingestion_spec_template.json
b/refinery-core/src/main/resources/ingestion_spec_template.json
new file mode 100644
index 0000000..f814559
--- /dev/null
+++ b/refinery-core/src/main/resources/ingestion_spec_template.json
@@ -0,0 +1,50 @@
+{
+ "type" : "index_hadoop",
+ "spec" : {
+ "ioConfig" : {
+ "type" : "hadoop",
+ "inputSpec" : {
+ "type" : "static",
+ "paths" : "{{INPUT_PATH}}"
+ }
+ },
+ "dataSchema" : {
+ "dataSource" : "{{DATA_SOURCE}}",
+ "granularitySpec" : {
+ "type" : "uniform",
+ "segmentGranularity" : "{{SEGMENT_GRANULARITY}}",
+ "queryGranularity" : "{{QUERY_GRANULARITY}}",
+ "intervals" : {{INTERVALS_ARRAY}}
+ },
+ "parser" : {
+ "type" : "string",
+ "parseSpec" : {
+ "format" : "json",
+ "dimensionsSpec" : {
+ "dimensions" : {{DIMENSIONS}}
+ },
+ "timestampSpec" : {
+ "format" : "{{TIMESTAMP_FORMAT}}",
+ "column" : "{{TIMESTAMP_COLUMN}}"
+ }
+ }
+ },
+ "metricsSpec" : {{METRICS}}
+ },
+ "tuningConfig" : {
+ "type" : "hadoop",
+ "overwriteFiles": true,
+ "ignoreInvalidRows" : false,
+ "partitionsSpec" : {
+ "type" : "hashed",
+ "numShards" : {{NUM_SHARDS}}
+ },
+ "jobProperties" : {
+ "mapreduce.reduce.memory.mb" : "{{REDUCE_MEMORY}}",
+ "mapreduce.output.fileoutputformat.compress":
+ "org.apache.hadoop.io.compress.GzipCodec",
+ "mapreduce.job.queuename": "{{HADOOP_QUEUE}}"
+ }
+ }
+ }
+}
diff --git
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala
new file mode 100644
index 0000000..09339cc
--- /dev/null
+++
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala
@@ -0,0 +1,354 @@
+package org.wikimedia.analytics.refinery.core
+
+import java.io.InputStream
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.http.client.HttpClient
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.impl.client.DefaultHttpClient
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.log4j.LogManager
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{IntegerType, LongType, FloatType,
DoubleType}
+import org.joda.time.DateTime
+import scala.util.parsing.json.JSON
+import scala.util.Random
+import scala.io.Source;
+
+/**
+ * Ingestion status enumeration object
+ */
+object IngestionStatus extends Enumeration {
+ val Initial, Loading, Done, Error = Value
+}
+
+/**
+ * DataFrameToDruid companion object
+ */
+object DataFrameToDruid {
+ // Constant: URL path to launch Druid ingestion.
+ val LaunchTaskPath = "/druid/indexer/v1/task"
+
+ // Constant: URL path to check task status.
+ val CheckTaskPath = "/druid/indexer/v1/task/{{DRUID_TASK_ID}}/status"
+
+ // Constant: DateTime format for Druid interval specs.
+ val IntervalDateTimeFormat = "yyyy-MM-dd'T'HH:mm'Z'"
+
+ // Constant: Name of the additional metric field that counts events.
+ val EventCountMetricName = "eventCount"
+
+ // Constant: Template to build Druid ingestion specs.
+ val stream: InputStream =
getClass().getResourceAsStream("/ingestion_spec_template.json");
+
+ val IngestionSpecTemplate: String =
Source.fromInputStream(stream).getLines.mkString;
+}
+/**
+ * DataFrame to Druid transaction class
+ *
+ * This class serves as a transaction helper to load data into Druid.
+ * The source data should be passed in as a DataFrame instance.
+ * Then an ingestion spec is created from the DataFrame schema.
+ * And finally a request is sent to Druid to trigger data ingestion.
+ * The process can be followed in 3 ways: callback, waiting and polling.
+ *
+ * Constructor example:
+ * val dftd = new DataFrameToDruid(
+ * dataSource = "some_dataset",
+ * inputDf = hiveContext.sql("select..."),
+ * dimensions = Seq("project", "language", ...),
+ * metrics = Seq("edits", "views", ...),
+ * intervals = Seq((new DateTime(2017, 1, 1), new DateTime(2017, 2,
1))),
+ * numShards = 4,
+ * hadoopQueue = "production"
+ * )
+ *
+ * Callback example:
+ * dftd.start((status: IngestionStatus.Value) => {
+ * // DataFrameToDruid will call this function asynchronously when the
+ * // process has ended, and pass it the resulting status.
+ * })
+ *
+ * Waiting example:
+ * dftd.start().await()
+ * // The code will only proceed when the process is done.
+ *
+ * Polling example:
+ * dftd.start()
+ * while (dftd.status() == IngestionStatus.Loading) Thread.sleep(10000)
+ * // The process is finished at this point.
+ *
+ * Parameters:
+ * sc SparkContext.
+ * dataSource Name of the target Druid data set (snake_case).
+ * inputDf DataFrame containing the data to be loaded. The data must
already
+ * be sliced to only contain the desired time intervals. It must
also
+ * be flat (no nested fields) and contain only the fields that
are to
+ * ingested into Druid.
+ * dimensions Sequence of field names that Druid should index as
dimensions.
+ * metrics Sequence of field names that Druid should ingest as metrics.
+ * Those fields have to be of numerical type.
+ * intervals Sequence of pairs (startDateTime, endDateTime) delimiting the
+ * intervals where the input DataFrame contains data.
+ * timestampColumn Name of the field containing the timestamp. This field
is
+ * mandatory for Druid ingestion spec.
+ * timestampFormat A string indicating the format of the timestamp field
+ * (iso|millis|posix|auto|or any Joda time format).
+ * segmentGranularity A string indicating the granularity of Druid's
segments
+ * for the data to be loaded
(quarter|month|week|day|hour).
+ * queryGranularity A string indicating the granularity of Druid's queries
+ * for the data to be loaded
(week|day|hour|minute|second).
+ * numShards Number of shards for Druid ingestion [optional].
+ * reduceMemory Memory to be used for Druid ingestion (string).
+ * hadoopQueue Name of Hadoop queue to launch the ingestion.
+ * druidHost String with Druid host.
+ * druidPort String with Druid port.
+ * checkInterval Integer with the number of milliseconds to wait between
checks.
+ * tempFilePathOver Optional string that overrides path to temporary file.
+ * httpClientOver Optional HttpClient instance (only for testing
purposes).
+ */
+class DataFrameToDruid(
+ sc: SparkContext,
+ dataSource: String,
+ inputDf: DataFrame,
+ dimensions: Seq[String],
+ metrics: Seq[String],
+ intervals: Seq[(DateTime, DateTime)],
+ timestampColumn: String,
+ timestampFormat: String,
+ segmentGranularity: String,
+ queryGranularity: String,
+ numShards: Int,
+ reduceMemory: String,
+ hadoopQueue: String,
+ druidHost: String,
+ druidPort: String,
+ checkInterval: Int = 10000,
+ tempFilePathOver: String = null.asInstanceOf[String],
+ httpClientOver: HttpClient = null.asInstanceOf[HttpClient]
+) {
+ private val log = LogManager.getLogger("DataFrameToDruid")
+
+ // Create a temporary file path for Druid data.
+ private val tempFilePath: String = if (tempFilePathOver != null)
tempFilePathOver else {
+ val randomId = Random.alphanumeric.take(5).mkString("")
+ val timestamp = DateTime.now.toString("yyyyMMddHHmmss")
+ s"/tmp/DataFrameToDruid/${dataSource}/${timestamp}/${randomId}"
+ }
+
+ // Add the event count to the DataFrame.
+ private val inputDfWithCount =
inputDf.withColumn(DataFrameToDruid.EventCountMetricName, lit(1L))
+ private val metricsWithCount = metrics :+
DataFrameToDruid.EventCountMetricName
+
+ // Initialize Druid ingestion spec.
+ log.info(s"Creating ingestion spec for ${dataSource}.")
+ private var ingestionSpec: String = createIngestionSpec()
+ log.info(ingestionSpec)
+
+ // Create a runnable that will execute the ingestion when launched.
+ private val statusUpdater: Thread = getStatusUpdater()
+
+ // Instance variables: need to be modified after constructor.
+ private var ingestionStatus: IngestionStatus.Value =
IngestionStatus.Initial
+ private var userCallback: Option[(IngestionStatus.Value) => Unit] = None
+ private var druidTaskId: String = ""
+
+ // Initialize httpClient, instruct it to follow redirects.
+ private val httpClient: HttpClient = if (httpClientOver != null)
httpClientOver else {
+ val client = new DefaultHttpClient()
+ client.setRedirectStrategy(new LaxRedirectStrategy())
+ client
+ }
+
+ /**
+ * Starts the process of loading the DataFrame to Druid.
+ *
+ * Params:
+ * callback Function to be executed once the process is finished
[optional].
+ * It should accept a parameter of type
IngestionStatus.Value,
+ * which will be passed the final status of the process.
+ * Returns:
+ * This DataFrameToDruid instance
+ * (to allow things like `dftd.start().await()`).
+ */
+ def start(
+ callback: Option[(IngestionStatus.Value) => Unit] = None
+ ): DataFrameToDruid = {
+ if (ingestionStatus == IngestionStatus.Initial) {
+ userCallback = callback
+
+ log.info(s"Writing temporary file for ${dataSource}.")
+ inputDfWithCount.write.json(tempFilePath)
+
+ log.info(s"Launching indexation task for ${dataSource}.")
+ druidTaskId = sendIngestionRequest()
+ log.info(s"Indexation task for ${dataSource} launched
successfully. " +
+ s"Task ID: ${druidTaskId}")
+ ingestionStatus = IngestionStatus.Loading
+ statusUpdater.start()
+ } else {
+ log.warn("Can not call start more than once. Ignoring.")
+ }
+ this
+ }
+
+ /**
+ * Blocks execution until the loading process has finished.
+ *
+ * This method has the ugly name 'await', because scala classes
+ * automatically define a method wait, which is not overridable.
+ *
+ * Returns:
+ * This DataFrameToDruid instance
+ * (to allow things like `dftd.start().await().status()`).
+ */
+ def await(): DataFrameToDruid = {
+ if (ingestionStatus == IngestionStatus.Initial) {
+ log.warn("Can not call await before calling start. Ignoring.")
+ } else {
+ statusUpdater.join()
+ }
+ this
+ }
+
+ /**
+ * Returns the status of the loading process.
+ *
+ * Returns:
+ * IngestionStatus.Value (Initial|Loading|Done|Error).
+ */
+ def status(): IngestionStatus.Value = {
+ ingestionStatus
+ }
+
+ // Creates the ingestion spec string by filling in the ingestion spec
template
+ // with the passed parameters. Returns the resulting string.
+ private def createIngestionSpec(): String = {
+ DataFrameToDruid.IngestionSpecTemplate
+ .replace("{{INPUT_PATH}}", tempFilePath)
+ .replace("{{DATA_SOURCE}}", dataSource)
+ .replace("{{SEGMENT_GRANULARITY}}", segmentGranularity)
+ .replace("{{QUERY_GRANULARITY}}", queryGranularity)
+ .replace("{{INTERVALS_ARRAY}}", formatIntervals())
+ .replace("{{DIMENSIONS}}", formatDimensions())
+ .replace("{{TIMESTAMP_FORMAT}}", timestampFormat)
+ .replace("{{TIMESTAMP_COLUMN}}", timestampColumn)
+ .replace("{{METRICS}}", formatMetrics())
+ .replace("{{NUM_SHARDS}}", numShards.toString())
+ .replace("{{REDUCE_MEMORY}}", reduceMemory)
+ .replace("{{HADOOP_QUEUE}}", hadoopQueue)
+ }
+
+ // Formats a sequence of pairs of DateTime objects into Druid intervals.
+ private def formatIntervals(): String = {
+ val formattedIntervals = intervals.map((interval) => {
+ val startStr =
interval._1.toString(DataFrameToDruid.IntervalDateTimeFormat)
+ val endStr =
interval._2.toString(DataFrameToDruid.IntervalDateTimeFormat)
+ "\"" + startStr + "/" + endStr + "\""
+ })
+ "[" + formattedIntervals.mkString(", ") + "]"
+ }
+
+ // Formats a sequence of field names into Druid dimensions.
+ private def formatDimensions(): String = {
+ val formattedDimensions = dimensions.map((d) => "\"" + d + "\"")
+ "[" + formattedDimensions.mkString(", ") + "]"
+ }
+
+ // Formats a sequence of field names into Druid metrics.
+ // Only longSum and doubleSum metrics are supported, so metric fields with
types
+ // other than Integer, Long, Float and Double will raise an error.
+ private def formatMetrics(): String = {
+ val formattedMetrics = metricsWithCount.map((field) => {
+ val fieldType = inputDfWithCount.schema.apply(field).dataType
match {
+ case IntegerType | LongType => "longSum"
+ case FloatType | DoubleType => "doubleSum"
+ }
+ s"""{\"name\": \"${field}\", \"fieldName\": \"${field}\",
\"type\": \"${fieldType}\"}"""
+ })
+ "[" + formattedMetrics.mkString(", ") + "]"
+ }
+
+ // Returns a thread that keeps polling Druid to check the status of the
+ // indexation task and updates the ingestionStatus var accordingly.
+ // When the task is finished, executes finalizations.
+ private def getStatusUpdater(): Thread = {
+ new Thread(
+ new Runnable {
+ def run() {
+ if (druidTaskId == "ERROR") {
+ ingestionStatus = IngestionStatus.Error
+ } else {
+ while (ingestionStatus == IngestionStatus.Loading) {
+ Thread.sleep(checkInterval)
+ log.info(s"Checking status of task ${druidTaskId}
for ${dataSource}.")
+ ingestionStatus = getDruidTaskStatus() match {
+ case "RUNNING" => IngestionStatus.Loading
+ case "SUCCESS" => IngestionStatus.Done
+ case "FAILED" | "ERROR" =>
IngestionStatus.Error
+ }
+ }
+ }
+ conclude()
+ }
+ }
+ )
+ }
+
+ // Sends an http post request to Druid to trigger ingestion.
+ // Returns the Druid task id.
+ private def sendIngestionRequest(): String = {
+ val url =
s"http://${druidHost}:${druidPort}${DataFrameToDruid.LaunchTaskPath}"
+ val post = new HttpPost(url)
+ post.addHeader("Content-type", "application/json")
+ post.setEntity(new StringEntity(ingestionSpec))
+ val response = httpClient.execute(post)
+ val statusCode = response.getStatusLine().getStatusCode()
+ if (statusCode == 200) {
+ val contentStream = response.getEntity().getContent()
+ val responseStr = IOUtils.toString(contentStream)
+ val responseObj =
JSON.parseFull(responseStr).get.asInstanceOf[Map[String, Any]]
+ responseObj("task").asInstanceOf[String]
+ } else "ERROR"
+ }
+
+ // Sends an http get request to Druid to check the ingestion task.
+ // Returns the resulting task status.
+ private def getDruidTaskStatus(): String = {
+ val path = DataFrameToDruid.CheckTaskPath.replace("{{DRUID_TASK_ID}}",
druidTaskId)
+ val url = s"http://${druidHost}:${druidPort}${path}"
+ val get = new HttpGet(url)
+ val response = httpClient.execute(get)
+ val statusCode = response.getStatusLine().getStatusCode()
+ if (statusCode == 200) {
+ val contentStream = response.getEntity().getContent()
+ val responseStr = IOUtils.toString(contentStream)
+ val responseObj =
JSON.parseFull(responseStr).get.asInstanceOf[Map[String, Any]]
+ val statusObj = responseObj("status").asInstanceOf[Map[String,
Any]]
+ statusObj("status").asInstanceOf[String]
+ } else "ERROR"
+ }
+
+ // Deletes the temporary file and calls user callback.
+ // Note that scala classes have dibs on the method name 'finalize', see:
await().
+ private def conclude(): Unit = {
+ ingestionStatus match {
+ case IngestionStatus.Done => log.info(
+ s"Druid ingestion task ${druidTaskId} for ${dataSource}
succeeded.")
+ case IngestionStatus.Error => log.error(
+ s"Druid ingestion task ${druidTaskId} for ${dataSource}
failed.")
+ }
+ val path = new Path(tempFilePath)
+ val fs = FileSystem.get(sc.hadoopConfiguration)
+ if (fs.exists(path)) fs.delete(path, true)
+ if (userCallback.isDefined) userCallback.get(ingestionStatus)
+ }
+}
diff --git
a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala
new file mode 100644
index 0000000..77cd491
--- /dev/null
+++
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala
@@ -0,0 +1,242 @@
+package org.wikimedia.analytics.refinery.core
+
+import com.holdenkarau.spark.testing.SharedSparkContext
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.http.client.HttpClient
+import org.apache.http.client.methods.{HttpGet, HttpPost, HttpUriRequest}
+import org.apache.http.entity.StringEntity
+import org.apache.http.message.BasicHttpResponse
+import org.apache.http.ProtocolVersion
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{StructType, StringType, IntegerType,
LongType, DoubleType}
+import org.joda.time.DateTime
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{FlatSpec, Matchers, BeforeAndAfterEach}
+import scala.util.parsing.json.JSON
+
+
+class TestDataFrameToDruid extends FlatSpec
+ with Matchers with BeforeAndAfterEach with SharedSparkContext with
MockFactory {
+
+ val testTempFile = "file:///tmp/TestDataFrameToDruid"
+ var httpClientMock: HttpClient = null.asInstanceOf[HttpClient]
+ var testDf: DataFrame = null.asInstanceOf[DataFrame]
+
+ // Prevents JSON.parseFull to parse integers as doubles.
+ JSON.globalNumberParser = (input: String) => {
+ if (input.contains(".")) input.toDouble else input.toLong
+ }
+
+ // Asserts if the given json path for the given json string matches the
expected value.
+ // This function circumvents the type cast limitations of JSON.parseFull.
+ def assertJson[T](jsonString: String, jsonPath: String, expectedValue: T):
Unit = {
+ val jsonObject =
JSON.parseFull(jsonString).get.asInstanceOf[Map[String, Any]]
+ // Allows paths with escaped dots for when they are part of the field
name.
+ val pathElements = jsonPath.replace("\\.",
"_").split("\\.").map(_.replace("_", "."))
+ val pathValue =
pathElements.foldLeft(jsonObject.asInstanceOf[Any])((j, p) => {
+ j.asInstanceOf[Map[String, Any]](p).asInstanceOf[Any]
+ })
+ assert(pathValue.asInstanceOf[T] == expectedValue)
+ }
+
+ // Creates a DataFrameToDruid instance with testing defaults.
+ def createDftd(): DataFrameToDruid = {
+ new DataFrameToDruid(
+ sc = sc,
+ dataSource = "test",
+ inputDf = testDf,
+ dimensions = Seq("event_category", "event_action", "wiki"),
+ metrics = Seq("event_seconds"),
+ intervals = Seq((new DateTime(1970, 1, 1, 0, 0), new
DateTime(1970, 1, 2, 0, 0))),
+ timestampColumn = "timestamp",
+ timestampFormat = "posix",
+ segmentGranularity = "hour",
+ queryGranularity = "minute",
+ numShards = 2,
+ reduceMemory = "8192",
+ hadoopQueue = "default",
+ druidHost = "test.druid.host",
+ druidPort = "8090",
+ checkInterval = 100,
+ tempFilePathOver = testTempFile,
+ httpClientOver = httpClientMock
+ )
+ }
+
+ // Creates a BasicHttpResponse given a status code and the respose data.
+ def createHttpResponse(statusCode: Int, data: String): BasicHttpResponse =
{
+ val response = new BasicHttpResponse(
+ new ProtocolVersion("HTTP", 1, 1),
+ statusCode,
+ "TestDataFrameToDruid"
+ )
+ response.setEntity(new StringEntity(data))
+ response
+ }
+
+ override def beforeEach(): Unit = {
+ // Create schema and data to be used in tests.
+ val sqlContext = new SQLContext(sc)
+ val testSchema: StructType = (new StructType)
+ .add("event_category", StringType)
+ .add("event_action", StringType)
+ .add("event_seconds", DoubleType)
+ .add("timestamp", LongType)
+ .add("wiki", StringType)
+ val testRDD = sc.parallelize(Seq(
+ Row("cat1", "read", 10.0, 1L, "enwiki"),
+ Row("cat2", "edit", 20.0, 2L, "enwiki"),
+ Row("cat1", "read", 30.0, 3L, "enwiki"),
+ Row("cat3", "read", 40.0, 4L, "enwiki")
+ ))
+ testDf = sqlContext.createDataFrame(testRDD, testSchema)
+
+ // Mock HttpClient to be injected into DataFrameToDuid.
+ // Its behavior will be defined in each test.
+ httpClientMock = mock[HttpClient]
+ }
+
+ override def afterEach(): Unit = {
+ // Delete temp file in case DataFrameToDuid can not accomplish it
+ // because of execution errors or assert failures.
+ val path = new Path(testTempFile)
+ val fs = FileSystem.get(sc.hadoopConfiguration)
+ if (fs.exists(path)) fs.delete(path, true)
+ }
+
+ it should "request for ingestion with the correct url" in {
+ inSequence {
+ // Should recevie an ingestion request; checks method and url,
returns ingestion task id.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall {
request: HttpUriRequest =>
+ val uri = request.getURI()
+ assert(uri.getHost() == "test.druid.host")
+ assert(uri.getPort() == 8090)
+ assert(uri.getPath() == "/druid/indexer/v1/task")
+ createHttpResponse(200, """{"task": "test-task-1"}""")
+ }
+ // Should receive a status request; returns succeeded.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+ createHttpResponse(200, """{"status": {"status":
"SUCCEEDED"}}""")
+ )
+ }
+ createDftd().start().await()
+ }
+
+ it should "request for ingestion with the correct spec" in {
+ inSequence {
+ // Should recevie an ingestion request; checks spec, returns
ingestion task id.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall {
request: HttpUriRequest =>
+ val contentStream =
request.asInstanceOf[HttpPost].getEntity().getContent()
+ val spec = IOUtils.toString(contentStream)
+
+ assertJson(spec, "spec.ioConfig.inputSpec.paths", testTempFile)
+ assertJson(spec, "spec.dataSchema.dataSource", "test")
+ assertJson(spec,
"spec.dataSchema.granularitySpec.segmentGranularity", "hour")
+ assertJson(spec,
"spec.dataSchema.granularitySpec.queryGranularity", "minute")
+ assertJson(spec, "spec.dataSchema.granularitySpec.intervals",
+ Seq("1970-01-01T00:00Z/1970-01-02T00:00Z"))
+ assertJson(spec,
"spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensions",
+ Seq("event_category", "event_action", "wiki"))
+ assertJson(spec,
"spec.dataSchema.parser.parseSpec.timestampSpec.format", "posix")
+ assertJson(spec,
"spec.dataSchema.parser.parseSpec.timestampSpec.column", "timestamp")
+ assertJson(spec, "spec.dataSchema.metricsSpec", Seq(
+ Map("name" -> "event_seconds", "fieldName" ->
"event_seconds", "type" -> "doubleSum"),
+ Map("name" -> "eventCount", "fieldName" -> "eventCount",
"type" -> "longSum")
+ ))
+ assertJson(spec, "spec.tuningConfig.partitionsSpec.numShards",
2L)
+ assertJson(spec,
"spec.tuningConfig.jobProperties.mapreduce\\.reduce\\.memory\\.mb", "8192")
+ assertJson(spec,
"spec.tuningConfig.jobProperties.mapreduce\\.job\\.queuename", "default")
+
+ createHttpResponse(200, """{"task": "test-task-1"}""")
+ }
+ // Should receive a status request; returns succeeded.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+ createHttpResponse(200, """{"status": {"status":
"SUCCEEDED"}}""")
+ )
+ }
+ createDftd().start().await()
+ }
+
+ it should "request for ingestion with the correct data" in {
+ inSequence {
+ // Should recevie an ingestion request; checks data, returns
ingestion task id.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall {
request: HttpUriRequest =>
+ val sqlContext = new SQLContext(sc)
+ val inputDf = sqlContext.read.json(testTempFile)
+ val expectedDf = testDf.withColumn("eventCount", lit(1L))
+ assert(inputDf.intersect(expectedDf).take(1).isEmpty)
+ createHttpResponse(200, """{"task": "test-task-1"}""")
+ }
+ // Should receive a status request; returns succeeded.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+ createHttpResponse(200, """{"status": {"status":
"SUCCEEDED"}}""")
+ )
+ }
+ createDftd().start().await()
+ }
+
+ it should "request for status check with correct method and url" in {
+ inSequence {
+ // Should recevie an ingestion request; returns ingestion task id.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+ createHttpResponse(200, """{"task": "test-task-1"}""")
+ )
+ // Should receive a status request; checks method and url, returns
succeeded.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall {
request: HttpUriRequest =>
+ val uri = request.getURI()
+ assert(uri.getHost() == "test.druid.host")
+ assert(uri.getPort() == 8090)
+ assert(uri.getPath() ==
"/druid/indexer/v1/task/test-task-1/status")
+ assert(request.getMethod() == "GET")
+ createHttpResponse(200, """{"status": {"status":
"SUCCEEDED"}}""")
+ }
+ }
+ createDftd().start().await()
+ }
+
+ it should "delete the temporary file" in {
+ inSequence {
+ // Should recevie an ingestion request; returns ingestion task id.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+ createHttpResponse(200, """{"task": "test-task-1"}""")
+ )
+ // Should receive a status request; returns succeeded.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+ createHttpResponse(200, """{"status": {"status":
"SUCCESS"}}""")
+ )
+ }
+ createDftd().start().await()
+
+ // Check that the temp file has been deleted.
+ val path = new Path(testTempFile)
+ val fs = FileSystem.get(sc.hadoopConfiguration)
+ assert(!fs.exists(path))
+ }
+
+ it should "call the user callback once finished" in {
+ inSequence {
+ // Should recevie an ingestion request; returns ingestion task id.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+ createHttpResponse(200, """{"task": "test-task-1"}""")
+ )
+ // Should receive a status request; returns succeeded.
+ (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+ createHttpResponse(200, """{"status": {"status":
"SUCCESS"}}""")
+ )
+ }
+
+ // Check that callback is actually executed.
+ var callbackExecuted = false
+ createDftd().start(Some((status: IngestionStatus.Value) => {
+ assert(status == IngestionStatus.Done)
+ callbackExecuted = true
+ })).await()
+ assert(callbackExecuted)
+ }
+}
diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index 21d4f6e..fd9a438 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -19,19 +19,6 @@
<artifactId>refinery-core</artifactId>
</dependency>
- <!--
- adding explicit dep for snappy, otherwise
- spark assumes is on the java.library.path
- see:
https://github.com/rvs/snappy-java/blob/master/src/main/resources/org/xerial/snappy/SnappyNativeLoader.java#L47
- -->
- <!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java
-->
- <dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- <version>1.1.2.5</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
diff --git
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala
new file mode 100644
index 0000000..e990927
--- /dev/null
+++
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala
@@ -0,0 +1,286 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.log4j.LogManager
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{IntegerType, LongType, FloatType,
DoubleType, StructField, StructType}
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+import org.wikimedia.analytics.refinery.core.{DataFrameToDruid,
IngestionStatus}
+import scopt.OptionParser
+
+
+object EventLoggingToDruid {
+
+ val log = LogManager.getLogger("EventLoggingToDruid")
+ val DateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH")
+
+ case class Params(
+ table: String = "",
+ startDate: DateTime = new DateTime(0),
+ endDate: DateTime = new DateTime(0),
+ database: String = "event",
+ metrics: (StructField) => Boolean = (f) => false,
+ blacklist: Seq[String] = Seq.empty,
+ segmentGranularity: String = "hour",
+ queryGranularity: String = "minute",
+ numShards: Int = 2,
+ reduceMemory: String = "8192",
+ hadoopQueue: String = "default",
+ druidHost: String = "druid1001.eqiad.wmnet",
+ druidPort: String = "8090",
+ dryRun: Boolean = false
+ )
+
+ // Field filters to help selecting fields as metrics.
+ object FieldFilter {
+ def isNumber(field: StructField): Boolean = {
+ field.dataType match {
+ case IntegerType | LongType | FloatType | DoubleType => true
+ case _ => false
+ }
+ }
+ def hasMetricAffix(field: StructField): Boolean = {
+ val withAffix = "(?i).*metric.*".r
+ field.name match {
+ case withAffix() => true
+ case _ => false
+ }
+ }
+ }
+
+ // Support implicit DateTime conversion from CLI opt.
+ // The opt can either be given in integer hours ago, or
+ // as an ISO-8601 date time.
+ implicit val scoptDateTimeRead: scopt.Read[DateTime] =
+ scopt.Read.reads { s => {
+ if (s.forall(Character.isDigit))
+ DateTime.now.minusHours(s.toInt)
+ else
+ DateTime.parse(s, DateFormatter)
+ }
+ }
+
+ val argsParser = new OptionParser[Params](
+ "spark-submit --class
org.wikimedia.analytics.refinery.job.EventLoggingToDruid refinery-job.jar"
+ ) {
+ head("""
+ |Hive tables -> Druid data sets
+ |
+ |Example:
+ | spark-submit --class
org.wikimedia.analytics.refinery.job.EventLoggingToDruid refinery-job.jar \
+ | --database event \
+ | --table NavigationTiming \
+ | --start-date 2017-09-29T03 \
+ | --end-date 2017-12-15T21 \
+ | --metrics numbers \
+ | --blacklist pageId,namespaceId,revId \
+ | --dry-run
+ |
+ |""".stripMargin, "")
+
+ note("""NOTE: You may pass all of the described CLI options to this
job in a single
+ | string with --options '<options>' flag.\n""".stripMargin)
+
+ help("help") text "Prints this usage text and exit."
+
+ opt[String]('T', "table").required().valueName("<table>").action { (x,
p) =>
+ p.copy(table = x)
+ }.text("Hive input table.")
+
+ opt[DateTime]('S',
"start-date").required().valueName("<YYYY-MM-DDTHH>").action { (x, p) =>
+ p.copy(startDate = new DateTime(x))
+ }.text("Start date of the interval to load (inclusive).")
+
+ opt[DateTime]('E',
"end-date").required().valueName("<YYYY-MM-DDTHH>").action { (x, p) =>
+ p.copy(endDate = new DateTime(x))
+ }.text("End date of the interval to load (exclusive).")
+
+ opt[String]('D', "database").optional().valueName("<database>").action
{ (x, p) =>
+ p.copy(database = x)
+ }.text("Hive input database. Default: event.")
+
+ opt[String]('m', "metrics").optional().valueName("<filter>").action {
(x, p) =>
+ p.copy(metrics = x match {
+ case "number" => FieldFilter.isNumber
+ case "affix" => FieldFilter.hasMetricAffix
+ })
+ }.text("Filter that will select metric columns (number|affix): " +
+ "'number' selects all columns that have a numeric type. " +
+ "'affix' selects all columns with the word 'metric' in them. " +
+ "eventCount will always be a metric in the loaded data set.")
+
+ opt[Seq[String]]('b',
"blacklist").optional().valueName("<column1>,<column2>...").action { (x, p) =>
+ p.copy(blacklist = x)
+ }.text("List of columns that are not to be loaded. For struct columns,
" +
+ "passing the column name will blacklist all data, whereas " +
+ "column_subField will only blacklist this sub-field.")
+
+ opt[String]('g',
"segment-granularity").optional().valueName("<granularity>").action { (x, p) =>
+ p.copy(segmentGranularity = x)
+ }.text("Granularity for Druid segments (quarter|month|week|day|hour).
Default: hour.")
+
+ opt[String]('q',
"query-granularity").optional().valueName("<granularity>").action { (x, p) =>
+ p.copy(queryGranularity = x)
+ }.text("Granularity for Druid queries (week|day|hour|minute|second).
Default: minute.")
+
+ opt[Int]('x', "num-shards").optional().valueName("<N>").action { (x,
p) =>
+ p.copy(numShards = x)
+ }.text("Number of shards for Druid ingestion. Default: 2.")
+
+ opt[Int]('x', "reduce-memory").optional().valueName("<N>").action {
(x, p) =>
+ p.copy(reduceMemory = x.toString())
+ }.text("Memory to be used by Hadoop for reduce operations. Default:
8192.")
+
+ opt[String]('h', "hadoop-queue").optional().valueName("<N>").action {
(x, p) =>
+ p.copy(hadoopQueue = x)
+ }.text("Hadoop queue where to execute the loading. Default: default.")
+
+ opt[String]('d', "druid-host").optional().valueName("<host>").action {
(x, p) =>
+ p.copy(druidHost = x)
+ }.text("Druid host to load the data to. Default:
druid1001.eqiad.wmnet.")
+
+ opt[Int]('p', "druid-port").optional().valueName("<port>").action {
(x, p) =>
+ p.copy(druidPort = x.toString())
+ }.text("Druid port to load the data to. Default: 8090.")
+
+ opt[Unit]('n', "dry-run").optional().action { (x, p) =>
+ p.copy(dryRun = true)
+ }.text("Do not execute any loading, only check and print parameters.")
+ }
+
+ val blacklistedHiveFields = Set("year", "month", "day", "hour")
+ val blacklistedCapsuleFields = Set("schema", "seqId", "uuid", "userAgent",
"clientValidated",
+ "isTruncated", "clientIp")
+ val legitCapsuleFields = Set("wiki", "webHost", "revision", "topic",
"recvFrom")
+
+ // Entry point
+ def main(args: Array[String]): Unit = {
+ val params = args.headOption match {
+ case Some("--options") =>
+ // If job options are given as a single string.
+ // Split them before passing them to argsParser.
+ argsParser.parse(args(1).split("\\s+"),
Params()).getOrElse(sys.exit(1))
+ case _ =>
+ argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+ }
+
+ if (apply(params)) sys.exit(0)
+ else sys.exit(1)
+ }
+
+ // This will be called after command line parameters have been parsed and
checked.
+ def apply(params: Params): Boolean = {
+
+ log.info(s"Starting process for ${params.database}_${params.table}.")
+ log.info(s"Querying Hive for intervals: " + Seq((params.startDate,
params.endDate)).toString())
+
+ // Initialize sqlContext.
+ val sc = new SparkContext(new SparkConf())
+ val hiveContext = new HiveContext(sc)
+ val sqlContext = hiveContext.asInstanceOf[SQLContext]
+
+ // Get data already filtered by time range.
+ val comparisonFormat = "yyyyMMddHH"
+ val comparisonStartDate = params.startDate.toString(comparisonFormat)
+ val comparisonEndDate = params.endDate.toString(comparisonFormat)
+ val concatTimestamp = "CONCAT(year, LPAD(month, 2, '0'), LPAD(day, 2,
'0'), LPAD(hour, 2, '0'))"
+ val df = sqlContext.sql(s"""
+ SELECT *
+ FROM ${params.database}.${params.table}
+ WHERE ${concatTimestamp} >= ${comparisonStartDate}
+ AND ${concatTimestamp} < ${comparisonEndDate}
+ """)
+
+ log.info("Preparing dimensions and metrics.")
+
+ // Flatten nested fields.
+ val flatColumns = getFlatColumns(df.schema)
+ val flatDf = df.select(flatColumns:_*)
+
+ // Remove blacklisted fields.
+ val cleanColumns = getCleanColumns(flatDf.schema, params.blacklist)
+ val finalDf = flatDf.select(cleanColumns:_*)
+
+ // Get dimensions and metrics.
+ val (dimensionFields, metricFields) =
getDimensionsAndMetrics(finalDf.schema, params.metrics)
+
+ log.info("Dimensions: " + dimensionFields.mkString(", "))
+ log.info("Metrics: " + metricFields.mkString(", "))
+
+ if (params.dryRun) {
+ log.info("Dry run finished: no data was loaded.")
+ true
+ } else {
+ // Execute loading process.
+ log.info("Launching DataFrameToDruid process.")
+ val dftd = new DataFrameToDruid(
+ sc = sc,
+ dataSource = s"${params.database}_${params.table}",
+ inputDf = finalDf,
+ dimensions = dimensionFields,
+ metrics = metricFields,
+ intervals = Seq((params.startDate, params.endDate)),
+ timestampColumn = "dt",
+ timestampFormat = "auto",
+ segmentGranularity = params.segmentGranularity,
+ queryGranularity = params.queryGranularity,
+ numShards = params.numShards,
+ reduceMemory = params.reduceMemory,
+ hadoopQueue = params.hadoopQueue,
+ druidHost = params.druidHost,
+ druidPort = params.druidPort
+ ).start().await()
+ log.info("Done.")
+
+ // Return whether the process was successful.
+ (dftd.status() == IngestionStatus.Done)
+ }
+ }
+
+ def getFlatColumns(schema: StructType, prefix: String = null): Seq[Column]
= {
+ // HACK: This map corrects casing for capsule fields, given that Hive
kills camelCase.
+ val capsuleFields = legitCapsuleFields.union(blacklistedCapsuleFields)
+ val capsuleCaseMap = capsuleFields.map(f => (f.toLowerCase(), f)).toMap
+
+ schema.fields.flatMap(field => {
+ val columnName = if (prefix == null) field.name else (prefix + "."
+ field.name)
+ val columnAlias = columnName.split("\\.").map(n =>
capsuleCaseMap.getOrElse(n, n)).mkString("_")
+
+ field.dataType match {
+ case struct: StructType => getFlatColumns(struct, columnName)
+ case _ => Seq(col(columnName).as(columnAlias))
+ }
+ })
+ }
+
+ def getCleanColumns(schema: StructType, blacklist: Seq[String]):
Seq[Column] = {
+ val blacklistNames = blacklist.toSet
+ .union(blacklistedCapsuleFields)
+ .union(blacklistedHiveFields)
+ val fieldNames = schema.fields.map(f => f.name)
+ val withPrefix = "([^_]*)_.*".r
+ fieldNames.filter(f => (
+ !blacklistNames.contains(f) &&
+ (f match {
+ case withPrefix(prefix) => !blacklistNames.contains(prefix)
+ case _ => true
+ })
+ )).map(col(_))
+ }
+
+ def getDimensionsAndMetrics(
+ schema: StructType,
+ metrics: (StructField) => Boolean
+ ): (Seq[String], Seq[String]) = {
+ val allFields = schema.fields.filter(f => f.name != "dt")
+ val metricFields = allFields
+ .filter((f) => !legitCapsuleFields.contains(f.name))
+ .filter(metrics)
+ val dimensionFields = allFields.filter(!metricFields.contains(_))
+ (dimensionFields.map(_.name), metricFields.map(_.name))
+ }
+}
diff --git
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala
new file mode 100644
index 0000000..d9bed98
--- /dev/null
+++
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala
@@ -0,0 +1,104 @@
+package org.wikimedia.analytics.refinery.job
+
+import com.holdenkarau.spark.testing.SharedSparkContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{StructType, StringType, IntegerType,
LongType}
+import org.scalatest.{FlatSpec, Matchers, BeforeAndAfterEach}
+
+
+class TestEventLoggingToDruid extends FlatSpec
+ with Matchers with BeforeAndAfterEach with SharedSparkContext {
+
+ var testDf: DataFrame = null.asInstanceOf[DataFrame]
+
+ override def beforeEach(): Unit = {
+ val testSchema: StructType = (new StructType)
+ .add("event", (new StructType)
+ .add("pageId", StringType)
+ .add("action", StringType)
+ .add("seconds", IntegerType))
+ .add("dt", StringType)
+ .add("webhost", StringType)
+ .add("useragent", StringType)
+ .add("year", IntegerType)
+
+ val testRDD = sc.parallelize(Seq(
+ Row(Row("page1", "edit", 10), "2017-01-01T00:00:00",
"en.wikimedia.org", "UA1", 2017),
+ Row(Row("page1", "read", 20), "2017-01-01T00:00:01",
"en.wikimedia.org", "UA2", 2017),
+ Row(Row("page2", "edit", 30), "2017-01-01T00:00:02",
"es.wikimedia.org", "UA3", 2017),
+ Row(Row("page3", "read", 40), "2017-01-01T00:00:03",
"es.wikimedia.org", "UA4", 2017),
+ Row(Row("page3", "read", 50), "2017-01-01T00:00:04",
"es.wikimedia.org", "UA5", 2017)
+ ))
+
+ val sqlContext = new SQLContext(sc)
+ testDf = sqlContext.createDataFrame(testRDD, testSchema)
+ }
+
+
+ it should "flatten the event part of the schema" in {
+ val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+ val flatFields = testDf.select(flatColumns:_*).schema.map(f => f.name)
+
+ assert(flatFields.contains("event_pageId"))
+ assert(flatFields.contains("event_action"))
+ assert(flatFields.contains("event_seconds"))
+ }
+
+ it should "correct case for the capsule fields" in {
+ val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+ val flatFields = testDf.select(flatColumns:_*).schema.map(f => f.name)
+
+ assert(flatFields.contains("webHost"))
+ assert(flatFields.contains("userAgent"))
+ }
+
+ it should "blacklist whole columns properly" in {
+ val blacklist = Seq("event", "webHost")
+ val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+ val flatDf = testDf.select(flatColumns:_*)
+ val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema,
blacklist)
+ val cleanFields = flatDf.select(cleanColumns:_*).schema.map(f =>
f.name)
+
+ assert(cleanFields.length == 1)
+ assert(cleanFields.contains("dt"))
+ }
+
+ it should "blacklist struct column subfields properly" in {
+ val blacklist = Seq("event_pageId", "event_seconds")
+ val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+ val flatDf = testDf.select(flatColumns:_*)
+ val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema,
blacklist)
+ val cleanFields = flatDf.select(cleanColumns:_*).schema.map(f =>
f.name)
+
+ assert(cleanFields.length == 3)
+ assert(cleanFields.contains("event_action"))
+ assert(cleanFields.contains("dt"))
+ assert(cleanFields.contains("webHost"))
+ }
+
+ it should "select dimensions and metrics properly" in {
+ val blacklist = Seq("event_pageId")
+ val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+ val flatDf = testDf.select(flatColumns:_*)
+ val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema,
blacklist)
+ val cleanDf = flatDf.select(cleanColumns:_*)
+
+ val (dimensionFields, metricFields) =
EventLoggingToDruid.getDimensionsAndMetrics(
+ cleanDf.schema,
+ (f) => f.dataType match {
+ case IntegerType => true
+ case _ => false
+ }
+ )
+
+ assert(dimensionFields.length == 2)
+ assert(dimensionFields.contains("event_action"))
+ assert(dimensionFields.contains("webHost"))
+
+ assert(metricFields.length == 1)
+ assert(metricFields.contains("event_seconds"))
+ }
+}
--
To view, visit https://gerrit.wikimedia.org/r/386882
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I30c374c3dcba44bbd0608e72ae0162bcc442cd0f
Gerrit-PatchSet: 25
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
Gerrit-Reviewer: Joal <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits