jenkins-bot has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/386882 )
Change subject: Add core class and job to import EL hive tables to Druid ...................................................................... Add core class and job to import EL hive tables to Druid Created resources directory to store master-ingestion-spec Bug: T166414 Change-Id: I30c374c3dcba44bbd0608e72ae0162bcc442cd0f --- M pom.xml M refinery-core/pom.xml A refinery-core/src/main/resources/ingestion_spec_template.json A refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala A refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala M refinery-job/pom.xml A refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala A refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala 8 files changed, 1,083 insertions(+), 14 deletions(-) Approvals: jenkins-bot: Verified Nuria: Looks good to me, approved diff --git a/pom.xml b/pom.xml index dcaa0fd..68ce8da 100644 --- a/pom.xml +++ b/pom.xml @@ -190,7 +190,21 @@ <scope>test</scope> </dependency> - <dependency> + + <!-- + adding explicit dep for snappy, otherwise + spark assumes is on the java.library.path + see: https://github.com/rvs/snappy-java/blob/master/src/main/resources/org/xerial/snappy/SnappyNativeLoader.java#L47 + --> + <!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java --> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.2.5</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>com.github.nscala-time</groupId> <artifactId>nscala-time_2.10</artifactId> <version>2.0.0</version> diff --git a/refinery-core/pom.xml b/refinery-core/pom.xml index 8ace48a..5e6d6f7 100644 --- a/refinery-core/pom.xml +++ b/refinery-core/pom.xml @@ -76,6 +76,13 @@ </dependency> <dependency> + <groupId>org.scalamock</groupId> + <artifactId>scalamock-scalatest-support_2.10</artifactId> + <version>3.2.2</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.10</artifactId> <scope>test</scope> @@ -119,7 +126,32 @@ <version>1.4.7</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <!-- javax.servlet in hadoop-common is older than the one in spark --> + <groupId>javax.servlet</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.holdenkarau</groupId> + <artifactId>spark-testing-base_2.10</artifactId> + <version>1.6.0_0.4.7</version> + <scope>test</scope> + </dependency> + + <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.3</version> + </dependency> </dependencies> <build> diff --git a/refinery-core/src/main/resources/ingestion_spec_template.json b/refinery-core/src/main/resources/ingestion_spec_template.json new file mode 100644 index 0000000..f814559 --- /dev/null +++ b/refinery-core/src/main/resources/ingestion_spec_template.json @@ -0,0 +1,50 @@ +{ + "type" : "index_hadoop", + "spec" : { + "ioConfig" : { + "type" : "hadoop", + "inputSpec" : { + "type" : "static", + "paths" : "{{INPUT_PATH}}" + } + }, + "dataSchema" : { + "dataSource" : "{{DATA_SOURCE}}", + "granularitySpec" : { + "type" : "uniform", + "segmentGranularity" : "{{SEGMENT_GRANULARITY}}", + "queryGranularity" : "{{QUERY_GRANULARITY}}", + "intervals" : {{INTERVALS_ARRAY}} + }, + "parser" : { + "type" : "string", + "parseSpec" : { + "format" : "json", + "dimensionsSpec" : { + "dimensions" : {{DIMENSIONS}} + }, + "timestampSpec" : { + "format" : "{{TIMESTAMP_FORMAT}}", + "column" : "{{TIMESTAMP_COLUMN}}" + } + } + }, + "metricsSpec" : {{METRICS}} + }, + "tuningConfig" : { + "type" : "hadoop", + "overwriteFiles": true, + "ignoreInvalidRows" : false, + "partitionsSpec" : { + "type" : "hashed", + "numShards" : {{NUM_SHARDS}} + }, + "jobProperties" : { + "mapreduce.reduce.memory.mb" : "{{REDUCE_MEMORY}}", + "mapreduce.output.fileoutputformat.compress": + "org.apache.hadoop.io.compress.GzipCodec", + "mapreduce.job.queuename": "{{HADOOP_QUEUE}}" + } + } + } +} diff --git a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala new file mode 100644 index 0000000..09339cc --- /dev/null +++ b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala @@ -0,0 +1,354 @@ +package org.wikimedia.analytics.refinery.core + +import java.io.InputStream + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.http.client.HttpClient +import org.apache.http.client.methods.HttpGet +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.DefaultHttpClient +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.log4j.LogManager +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.types.{IntegerType, LongType, FloatType, DoubleType} +import org.joda.time.DateTime +import scala.util.parsing.json.JSON +import scala.util.Random +import scala.io.Source; + +/** + * Ingestion status enumeration object + */ +object IngestionStatus extends Enumeration { + val Initial, Loading, Done, Error = Value +} + +/** + * DataFrameToDruid companion object + */ +object DataFrameToDruid { + // Constant: URL path to launch Druid ingestion. + val LaunchTaskPath = "/druid/indexer/v1/task" + + // Constant: URL path to check task status. + val CheckTaskPath = "/druid/indexer/v1/task/{{DRUID_TASK_ID}}/status" + + // Constant: DateTime format for Druid interval specs. + val IntervalDateTimeFormat = "yyyy-MM-dd'T'HH:mm'Z'" + + // Constant: Name of the additional metric field that counts events. + val EventCountMetricName = "eventCount" + + // Constant: Template to build Druid ingestion specs. + val stream: InputStream = getClass().getResourceAsStream("/ingestion_spec_template.json"); + + val IngestionSpecTemplate: String = Source.fromInputStream(stream).getLines.mkString; +} +/** + * DataFrame to Druid transaction class + * + * This class serves as a transaction helper to load data into Druid. + * The source data should be passed in as a DataFrame instance. + * Then an ingestion spec is created from the DataFrame schema. + * And finally a request is sent to Druid to trigger data ingestion. + * The process can be followed in 3 ways: callback, waiting and polling. + * + * Constructor example: + * val dftd = new DataFrameToDruid( + * dataSource = "some_dataset", + * inputDf = hiveContext.sql("select..."), + * dimensions = Seq("project", "language", ...), + * metrics = Seq("edits", "views", ...), + * intervals = Seq((new DateTime(2017, 1, 1), new DateTime(2017, 2, 1))), + * numShards = 4, + * hadoopQueue = "production" + * ) + * + * Callback example: + * dftd.start((status: IngestionStatus.Value) => { + * // DataFrameToDruid will call this function asynchronously when the + * // process has ended, and pass it the resulting status. + * }) + * + * Waiting example: + * dftd.start().await() + * // The code will only proceed when the process is done. + * + * Polling example: + * dftd.start() + * while (dftd.status() == IngestionStatus.Loading) Thread.sleep(10000) + * // The process is finished at this point. + * + * Parameters: + * sc SparkContext. + * dataSource Name of the target Druid data set (snake_case). + * inputDf DataFrame containing the data to be loaded. The data must already + * be sliced to only contain the desired time intervals. It must also + * be flat (no nested fields) and contain only the fields that are to + * ingested into Druid. + * dimensions Sequence of field names that Druid should index as dimensions. + * metrics Sequence of field names that Druid should ingest as metrics. + * Those fields have to be of numerical type. + * intervals Sequence of pairs (startDateTime, endDateTime) delimiting the + * intervals where the input DataFrame contains data. + * timestampColumn Name of the field containing the timestamp. This field is + * mandatory for Druid ingestion spec. + * timestampFormat A string indicating the format of the timestamp field + * (iso|millis|posix|auto|or any Joda time format). + * segmentGranularity A string indicating the granularity of Druid's segments + * for the data to be loaded (quarter|month|week|day|hour). + * queryGranularity A string indicating the granularity of Druid's queries + * for the data to be loaded (week|day|hour|minute|second). + * numShards Number of shards for Druid ingestion [optional]. + * reduceMemory Memory to be used for Druid ingestion (string). + * hadoopQueue Name of Hadoop queue to launch the ingestion. + * druidHost String with Druid host. + * druidPort String with Druid port. + * checkInterval Integer with the number of milliseconds to wait between checks. + * tempFilePathOver Optional string that overrides path to temporary file. + * httpClientOver Optional HttpClient instance (only for testing purposes). + */ +class DataFrameToDruid( + sc: SparkContext, + dataSource: String, + inputDf: DataFrame, + dimensions: Seq[String], + metrics: Seq[String], + intervals: Seq[(DateTime, DateTime)], + timestampColumn: String, + timestampFormat: String, + segmentGranularity: String, + queryGranularity: String, + numShards: Int, + reduceMemory: String, + hadoopQueue: String, + druidHost: String, + druidPort: String, + checkInterval: Int = 10000, + tempFilePathOver: String = null.asInstanceOf[String], + httpClientOver: HttpClient = null.asInstanceOf[HttpClient] +) { + private val log = LogManager.getLogger("DataFrameToDruid") + + // Create a temporary file path for Druid data. + private val tempFilePath: String = if (tempFilePathOver != null) tempFilePathOver else { + val randomId = Random.alphanumeric.take(5).mkString("") + val timestamp = DateTime.now.toString("yyyyMMddHHmmss") + s"/tmp/DataFrameToDruid/${dataSource}/${timestamp}/${randomId}" + } + + // Add the event count to the DataFrame. + private val inputDfWithCount = inputDf.withColumn(DataFrameToDruid.EventCountMetricName, lit(1L)) + private val metricsWithCount = metrics :+ DataFrameToDruid.EventCountMetricName + + // Initialize Druid ingestion spec. + log.info(s"Creating ingestion spec for ${dataSource}.") + private var ingestionSpec: String = createIngestionSpec() + log.info(ingestionSpec) + + // Create a runnable that will execute the ingestion when launched. + private val statusUpdater: Thread = getStatusUpdater() + + // Instance variables: need to be modified after constructor. + private var ingestionStatus: IngestionStatus.Value = IngestionStatus.Initial + private var userCallback: Option[(IngestionStatus.Value) => Unit] = None + private var druidTaskId: String = "" + + // Initialize httpClient, instruct it to follow redirects. + private val httpClient: HttpClient = if (httpClientOver != null) httpClientOver else { + val client = new DefaultHttpClient() + client.setRedirectStrategy(new LaxRedirectStrategy()) + client + } + + /** + * Starts the process of loading the DataFrame to Druid. + * + * Params: + * callback Function to be executed once the process is finished [optional]. + * It should accept a parameter of type IngestionStatus.Value, + * which will be passed the final status of the process. + * Returns: + * This DataFrameToDruid instance + * (to allow things like `dftd.start().await()`). + */ + def start( + callback: Option[(IngestionStatus.Value) => Unit] = None + ): DataFrameToDruid = { + if (ingestionStatus == IngestionStatus.Initial) { + userCallback = callback + + log.info(s"Writing temporary file for ${dataSource}.") + inputDfWithCount.write.json(tempFilePath) + + log.info(s"Launching indexation task for ${dataSource}.") + druidTaskId = sendIngestionRequest() + log.info(s"Indexation task for ${dataSource} launched successfully. " + + s"Task ID: ${druidTaskId}") + ingestionStatus = IngestionStatus.Loading + statusUpdater.start() + } else { + log.warn("Can not call start more than once. Ignoring.") + } + this + } + + /** + * Blocks execution until the loading process has finished. + * + * This method has the ugly name 'await', because scala classes + * automatically define a method wait, which is not overridable. + * + * Returns: + * This DataFrameToDruid instance + * (to allow things like `dftd.start().await().status()`). + */ + def await(): DataFrameToDruid = { + if (ingestionStatus == IngestionStatus.Initial) { + log.warn("Can not call await before calling start. Ignoring.") + } else { + statusUpdater.join() + } + this + } + + /** + * Returns the status of the loading process. + * + * Returns: + * IngestionStatus.Value (Initial|Loading|Done|Error). + */ + def status(): IngestionStatus.Value = { + ingestionStatus + } + + // Creates the ingestion spec string by filling in the ingestion spec template + // with the passed parameters. Returns the resulting string. + private def createIngestionSpec(): String = { + DataFrameToDruid.IngestionSpecTemplate + .replace("{{INPUT_PATH}}", tempFilePath) + .replace("{{DATA_SOURCE}}", dataSource) + .replace("{{SEGMENT_GRANULARITY}}", segmentGranularity) + .replace("{{QUERY_GRANULARITY}}", queryGranularity) + .replace("{{INTERVALS_ARRAY}}", formatIntervals()) + .replace("{{DIMENSIONS}}", formatDimensions()) + .replace("{{TIMESTAMP_FORMAT}}", timestampFormat) + .replace("{{TIMESTAMP_COLUMN}}", timestampColumn) + .replace("{{METRICS}}", formatMetrics()) + .replace("{{NUM_SHARDS}}", numShards.toString()) + .replace("{{REDUCE_MEMORY}}", reduceMemory) + .replace("{{HADOOP_QUEUE}}", hadoopQueue) + } + + // Formats a sequence of pairs of DateTime objects into Druid intervals. + private def formatIntervals(): String = { + val formattedIntervals = intervals.map((interval) => { + val startStr = interval._1.toString(DataFrameToDruid.IntervalDateTimeFormat) + val endStr = interval._2.toString(DataFrameToDruid.IntervalDateTimeFormat) + "\"" + startStr + "/" + endStr + "\"" + }) + "[" + formattedIntervals.mkString(", ") + "]" + } + + // Formats a sequence of field names into Druid dimensions. + private def formatDimensions(): String = { + val formattedDimensions = dimensions.map((d) => "\"" + d + "\"") + "[" + formattedDimensions.mkString(", ") + "]" + } + + // Formats a sequence of field names into Druid metrics. + // Only longSum and doubleSum metrics are supported, so metric fields with types + // other than Integer, Long, Float and Double will raise an error. + private def formatMetrics(): String = { + val formattedMetrics = metricsWithCount.map((field) => { + val fieldType = inputDfWithCount.schema.apply(field).dataType match { + case IntegerType | LongType => "longSum" + case FloatType | DoubleType => "doubleSum" + } + s"""{\"name\": \"${field}\", \"fieldName\": \"${field}\", \"type\": \"${fieldType}\"}""" + }) + "[" + formattedMetrics.mkString(", ") + "]" + } + + // Returns a thread that keeps polling Druid to check the status of the + // indexation task and updates the ingestionStatus var accordingly. + // When the task is finished, executes finalizations. + private def getStatusUpdater(): Thread = { + new Thread( + new Runnable { + def run() { + if (druidTaskId == "ERROR") { + ingestionStatus = IngestionStatus.Error + } else { + while (ingestionStatus == IngestionStatus.Loading) { + Thread.sleep(checkInterval) + log.info(s"Checking status of task ${druidTaskId} for ${dataSource}.") + ingestionStatus = getDruidTaskStatus() match { + case "RUNNING" => IngestionStatus.Loading + case "SUCCESS" => IngestionStatus.Done + case "FAILED" | "ERROR" => IngestionStatus.Error + } + } + } + conclude() + } + } + ) + } + + // Sends an http post request to Druid to trigger ingestion. + // Returns the Druid task id. + private def sendIngestionRequest(): String = { + val url = s"http://${druidHost}:${druidPort}${DataFrameToDruid.LaunchTaskPath}" + val post = new HttpPost(url) + post.addHeader("Content-type", "application/json") + post.setEntity(new StringEntity(ingestionSpec)) + val response = httpClient.execute(post) + val statusCode = response.getStatusLine().getStatusCode() + if (statusCode == 200) { + val contentStream = response.getEntity().getContent() + val responseStr = IOUtils.toString(contentStream) + val responseObj = JSON.parseFull(responseStr).get.asInstanceOf[Map[String, Any]] + responseObj("task").asInstanceOf[String] + } else "ERROR" + } + + // Sends an http get request to Druid to check the ingestion task. + // Returns the resulting task status. + private def getDruidTaskStatus(): String = { + val path = DataFrameToDruid.CheckTaskPath.replace("{{DRUID_TASK_ID}}", druidTaskId) + val url = s"http://${druidHost}:${druidPort}${path}" + val get = new HttpGet(url) + val response = httpClient.execute(get) + val statusCode = response.getStatusLine().getStatusCode() + if (statusCode == 200) { + val contentStream = response.getEntity().getContent() + val responseStr = IOUtils.toString(contentStream) + val responseObj = JSON.parseFull(responseStr).get.asInstanceOf[Map[String, Any]] + val statusObj = responseObj("status").asInstanceOf[Map[String, Any]] + statusObj("status").asInstanceOf[String] + } else "ERROR" + } + + // Deletes the temporary file and calls user callback. + // Note that scala classes have dibs on the method name 'finalize', see: await(). + private def conclude(): Unit = { + ingestionStatus match { + case IngestionStatus.Done => log.info( + s"Druid ingestion task ${druidTaskId} for ${dataSource} succeeded.") + case IngestionStatus.Error => log.error( + s"Druid ingestion task ${druidTaskId} for ${dataSource} failed.") + } + val path = new Path(tempFilePath) + val fs = FileSystem.get(sc.hadoopConfiguration) + if (fs.exists(path)) fs.delete(path, true) + if (userCallback.isDefined) userCallback.get(ingestionStatus) + } +} diff --git a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala new file mode 100644 index 0000000..77cd491 --- /dev/null +++ b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala @@ -0,0 +1,242 @@ +package org.wikimedia.analytics.refinery.core + +import com.holdenkarau.spark.testing.SharedSparkContext +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.http.client.HttpClient +import org.apache.http.client.methods.{HttpGet, HttpPost, HttpUriRequest} +import org.apache.http.entity.StringEntity +import org.apache.http.message.BasicHttpResponse +import org.apache.http.ProtocolVersion +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.Row +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.types.{StructType, StringType, IntegerType, LongType, DoubleType} +import org.joda.time.DateTime +import org.scalamock.scalatest.MockFactory +import org.scalatest.{FlatSpec, Matchers, BeforeAndAfterEach} +import scala.util.parsing.json.JSON + + +class TestDataFrameToDruid extends FlatSpec + with Matchers with BeforeAndAfterEach with SharedSparkContext with MockFactory { + + val testTempFile = "file:///tmp/TestDataFrameToDruid" + var httpClientMock: HttpClient = null.asInstanceOf[HttpClient] + var testDf: DataFrame = null.asInstanceOf[DataFrame] + + // Prevents JSON.parseFull to parse integers as doubles. + JSON.globalNumberParser = (input: String) => { + if (input.contains(".")) input.toDouble else input.toLong + } + + // Asserts if the given json path for the given json string matches the expected value. + // This function circumvents the type cast limitations of JSON.parseFull. + def assertJson[T](jsonString: String, jsonPath: String, expectedValue: T): Unit = { + val jsonObject = JSON.parseFull(jsonString).get.asInstanceOf[Map[String, Any]] + // Allows paths with escaped dots for when they are part of the field name. + val pathElements = jsonPath.replace("\\.", "_").split("\\.").map(_.replace("_", ".")) + val pathValue = pathElements.foldLeft(jsonObject.asInstanceOf[Any])((j, p) => { + j.asInstanceOf[Map[String, Any]](p).asInstanceOf[Any] + }) + assert(pathValue.asInstanceOf[T] == expectedValue) + } + + // Creates a DataFrameToDruid instance with testing defaults. + def createDftd(): DataFrameToDruid = { + new DataFrameToDruid( + sc = sc, + dataSource = "test", + inputDf = testDf, + dimensions = Seq("event_category", "event_action", "wiki"), + metrics = Seq("event_seconds"), + intervals = Seq((new DateTime(1970, 1, 1, 0, 0), new DateTime(1970, 1, 2, 0, 0))), + timestampColumn = "timestamp", + timestampFormat = "posix", + segmentGranularity = "hour", + queryGranularity = "minute", + numShards = 2, + reduceMemory = "8192", + hadoopQueue = "default", + druidHost = "test.druid.host", + druidPort = "8090", + checkInterval = 100, + tempFilePathOver = testTempFile, + httpClientOver = httpClientMock + ) + } + + // Creates a BasicHttpResponse given a status code and the respose data. + def createHttpResponse(statusCode: Int, data: String): BasicHttpResponse = { + val response = new BasicHttpResponse( + new ProtocolVersion("HTTP", 1, 1), + statusCode, + "TestDataFrameToDruid" + ) + response.setEntity(new StringEntity(data)) + response + } + + override def beforeEach(): Unit = { + // Create schema and data to be used in tests. + val sqlContext = new SQLContext(sc) + val testSchema: StructType = (new StructType) + .add("event_category", StringType) + .add("event_action", StringType) + .add("event_seconds", DoubleType) + .add("timestamp", LongType) + .add("wiki", StringType) + val testRDD = sc.parallelize(Seq( + Row("cat1", "read", 10.0, 1L, "enwiki"), + Row("cat2", "edit", 20.0, 2L, "enwiki"), + Row("cat1", "read", 30.0, 3L, "enwiki"), + Row("cat3", "read", 40.0, 4L, "enwiki") + )) + testDf = sqlContext.createDataFrame(testRDD, testSchema) + + // Mock HttpClient to be injected into DataFrameToDuid. + // Its behavior will be defined in each test. + httpClientMock = mock[HttpClient] + } + + override def afterEach(): Unit = { + // Delete temp file in case DataFrameToDuid can not accomplish it + // because of execution errors or assert failures. + val path = new Path(testTempFile) + val fs = FileSystem.get(sc.hadoopConfiguration) + if (fs.exists(path)) fs.delete(path, true) + } + + it should "request for ingestion with the correct url" in { + inSequence { + // Should recevie an ingestion request; checks method and url, returns ingestion task id. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall { request: HttpUriRequest => + val uri = request.getURI() + assert(uri.getHost() == "test.druid.host") + assert(uri.getPort() == 8090) + assert(uri.getPath() == "/druid/indexer/v1/task") + createHttpResponse(200, """{"task": "test-task-1"}""") + } + // Should receive a status request; returns succeeded. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning( + createHttpResponse(200, """{"status": {"status": "SUCCEEDED"}}""") + ) + } + createDftd().start().await() + } + + it should "request for ingestion with the correct spec" in { + inSequence { + // Should recevie an ingestion request; checks spec, returns ingestion task id. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall { request: HttpUriRequest => + val contentStream = request.asInstanceOf[HttpPost].getEntity().getContent() + val spec = IOUtils.toString(contentStream) + + assertJson(spec, "spec.ioConfig.inputSpec.paths", testTempFile) + assertJson(spec, "spec.dataSchema.dataSource", "test") + assertJson(spec, "spec.dataSchema.granularitySpec.segmentGranularity", "hour") + assertJson(spec, "spec.dataSchema.granularitySpec.queryGranularity", "minute") + assertJson(spec, "spec.dataSchema.granularitySpec.intervals", + Seq("1970-01-01T00:00Z/1970-01-02T00:00Z")) + assertJson(spec, "spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensions", + Seq("event_category", "event_action", "wiki")) + assertJson(spec, "spec.dataSchema.parser.parseSpec.timestampSpec.format", "posix") + assertJson(spec, "spec.dataSchema.parser.parseSpec.timestampSpec.column", "timestamp") + assertJson(spec, "spec.dataSchema.metricsSpec", Seq( + Map("name" -> "event_seconds", "fieldName" -> "event_seconds", "type" -> "doubleSum"), + Map("name" -> "eventCount", "fieldName" -> "eventCount", "type" -> "longSum") + )) + assertJson(spec, "spec.tuningConfig.partitionsSpec.numShards", 2L) + assertJson(spec, "spec.tuningConfig.jobProperties.mapreduce\\.reduce\\.memory\\.mb", "8192") + assertJson(spec, "spec.tuningConfig.jobProperties.mapreduce\\.job\\.queuename", "default") + + createHttpResponse(200, """{"task": "test-task-1"}""") + } + // Should receive a status request; returns succeeded. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning( + createHttpResponse(200, """{"status": {"status": "SUCCEEDED"}}""") + ) + } + createDftd().start().await() + } + + it should "request for ingestion with the correct data" in { + inSequence { + // Should recevie an ingestion request; checks data, returns ingestion task id. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall { request: HttpUriRequest => + val sqlContext = new SQLContext(sc) + val inputDf = sqlContext.read.json(testTempFile) + val expectedDf = testDf.withColumn("eventCount", lit(1L)) + assert(inputDf.intersect(expectedDf).take(1).isEmpty) + createHttpResponse(200, """{"task": "test-task-1"}""") + } + // Should receive a status request; returns succeeded. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning( + createHttpResponse(200, """{"status": {"status": "SUCCEEDED"}}""") + ) + } + createDftd().start().await() + } + + it should "request for status check with correct method and url" in { + inSequence { + // Should recevie an ingestion request; returns ingestion task id. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning( + createHttpResponse(200, """{"task": "test-task-1"}""") + ) + // Should receive a status request; checks method and url, returns succeeded. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall { request: HttpUriRequest => + val uri = request.getURI() + assert(uri.getHost() == "test.druid.host") + assert(uri.getPort() == 8090) + assert(uri.getPath() == "/druid/indexer/v1/task/test-task-1/status") + assert(request.getMethod() == "GET") + createHttpResponse(200, """{"status": {"status": "SUCCEEDED"}}""") + } + } + createDftd().start().await() + } + + it should "delete the temporary file" in { + inSequence { + // Should recevie an ingestion request; returns ingestion task id. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning( + createHttpResponse(200, """{"task": "test-task-1"}""") + ) + // Should receive a status request; returns succeeded. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning( + createHttpResponse(200, """{"status": {"status": "SUCCESS"}}""") + ) + } + createDftd().start().await() + + // Check that the temp file has been deleted. + val path = new Path(testTempFile) + val fs = FileSystem.get(sc.hadoopConfiguration) + assert(!fs.exists(path)) + } + + it should "call the user callback once finished" in { + inSequence { + // Should recevie an ingestion request; returns ingestion task id. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning( + createHttpResponse(200, """{"task": "test-task-1"}""") + ) + // Should receive a status request; returns succeeded. + (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning( + createHttpResponse(200, """{"status": {"status": "SUCCESS"}}""") + ) + } + + // Check that callback is actually executed. + var callbackExecuted = false + createDftd().start(Some((status: IngestionStatus.Value) => { + assert(status == IngestionStatus.Done) + callbackExecuted = true + })).await() + assert(callbackExecuted) + } +} diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml index 21d4f6e..fd9a438 100644 --- a/refinery-job/pom.xml +++ b/refinery-job/pom.xml @@ -19,19 +19,6 @@ <artifactId>refinery-core</artifactId> </dependency> - <!-- - adding explicit dep for snappy, otherwise - spark assumes is on the java.library.path - see: https://github.com/rvs/snappy-java/blob/master/src/main/resources/org/xerial/snappy/SnappyNativeLoader.java#L47 - --> - <!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java --> - <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - <version>1.1.2.5</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala new file mode 100644 index 0000000..e990927 --- /dev/null +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala @@ -0,0 +1,286 @@ +package org.wikimedia.analytics.refinery.job + +import org.apache.log4j.LogManager +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.types.{IntegerType, LongType, FloatType, DoubleType, StructField, StructType} +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat +import org.wikimedia.analytics.refinery.core.{DataFrameToDruid, IngestionStatus} +import scopt.OptionParser + + +object EventLoggingToDruid { + + val log = LogManager.getLogger("EventLoggingToDruid") + val DateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH") + + case class Params( + table: String = "", + startDate: DateTime = new DateTime(0), + endDate: DateTime = new DateTime(0), + database: String = "event", + metrics: (StructField) => Boolean = (f) => false, + blacklist: Seq[String] = Seq.empty, + segmentGranularity: String = "hour", + queryGranularity: String = "minute", + numShards: Int = 2, + reduceMemory: String = "8192", + hadoopQueue: String = "default", + druidHost: String = "druid1001.eqiad.wmnet", + druidPort: String = "8090", + dryRun: Boolean = false + ) + + // Field filters to help selecting fields as metrics. + object FieldFilter { + def isNumber(field: StructField): Boolean = { + field.dataType match { + case IntegerType | LongType | FloatType | DoubleType => true + case _ => false + } + } + def hasMetricAffix(field: StructField): Boolean = { + val withAffix = "(?i).*metric.*".r + field.name match { + case withAffix() => true + case _ => false + } + } + } + + // Support implicit DateTime conversion from CLI opt. + // The opt can either be given in integer hours ago, or + // as an ISO-8601 date time. + implicit val scoptDateTimeRead: scopt.Read[DateTime] = + scopt.Read.reads { s => { + if (s.forall(Character.isDigit)) + DateTime.now.minusHours(s.toInt) + else + DateTime.parse(s, DateFormatter) + } + } + + val argsParser = new OptionParser[Params]( + "spark-submit --class org.wikimedia.analytics.refinery.job.EventLoggingToDruid refinery-job.jar" + ) { + head(""" + |Hive tables -> Druid data sets + | + |Example: + | spark-submit --class org.wikimedia.analytics.refinery.job.EventLoggingToDruid refinery-job.jar \ + | --database event \ + | --table NavigationTiming \ + | --start-date 2017-09-29T03 \ + | --end-date 2017-12-15T21 \ + | --metrics numbers \ + | --blacklist pageId,namespaceId,revId \ + | --dry-run + | + |""".stripMargin, "") + + note("""NOTE: You may pass all of the described CLI options to this job in a single + | string with --options '<options>' flag.\n""".stripMargin) + + help("help") text "Prints this usage text and exit." + + opt[String]('T', "table").required().valueName("<table>").action { (x, p) => + p.copy(table = x) + }.text("Hive input table.") + + opt[DateTime]('S', "start-date").required().valueName("<YYYY-MM-DDTHH>").action { (x, p) => + p.copy(startDate = new DateTime(x)) + }.text("Start date of the interval to load (inclusive).") + + opt[DateTime]('E', "end-date").required().valueName("<YYYY-MM-DDTHH>").action { (x, p) => + p.copy(endDate = new DateTime(x)) + }.text("End date of the interval to load (exclusive).") + + opt[String]('D', "database").optional().valueName("<database>").action { (x, p) => + p.copy(database = x) + }.text("Hive input database. Default: event.") + + opt[String]('m', "metrics").optional().valueName("<filter>").action { (x, p) => + p.copy(metrics = x match { + case "number" => FieldFilter.isNumber + case "affix" => FieldFilter.hasMetricAffix + }) + }.text("Filter that will select metric columns (number|affix): " + + "'number' selects all columns that have a numeric type. " + + "'affix' selects all columns with the word 'metric' in them. " + + "eventCount will always be a metric in the loaded data set.") + + opt[Seq[String]]('b', "blacklist").optional().valueName("<column1>,<column2>...").action { (x, p) => + p.copy(blacklist = x) + }.text("List of columns that are not to be loaded. For struct columns, " + + "passing the column name will blacklist all data, whereas " + + "column_subField will only blacklist this sub-field.") + + opt[String]('g', "segment-granularity").optional().valueName("<granularity>").action { (x, p) => + p.copy(segmentGranularity = x) + }.text("Granularity for Druid segments (quarter|month|week|day|hour). Default: hour.") + + opt[String]('q', "query-granularity").optional().valueName("<granularity>").action { (x, p) => + p.copy(queryGranularity = x) + }.text("Granularity for Druid queries (week|day|hour|minute|second). Default: minute.") + + opt[Int]('x', "num-shards").optional().valueName("<N>").action { (x, p) => + p.copy(numShards = x) + }.text("Number of shards for Druid ingestion. Default: 2.") + + opt[Int]('x', "reduce-memory").optional().valueName("<N>").action { (x, p) => + p.copy(reduceMemory = x.toString()) + }.text("Memory to be used by Hadoop for reduce operations. Default: 8192.") + + opt[String]('h', "hadoop-queue").optional().valueName("<N>").action { (x, p) => + p.copy(hadoopQueue = x) + }.text("Hadoop queue where to execute the loading. Default: default.") + + opt[String]('d', "druid-host").optional().valueName("<host>").action { (x, p) => + p.copy(druidHost = x) + }.text("Druid host to load the data to. Default: druid1001.eqiad.wmnet.") + + opt[Int]('p', "druid-port").optional().valueName("<port>").action { (x, p) => + p.copy(druidPort = x.toString()) + }.text("Druid port to load the data to. Default: 8090.") + + opt[Unit]('n', "dry-run").optional().action { (x, p) => + p.copy(dryRun = true) + }.text("Do not execute any loading, only check and print parameters.") + } + + val blacklistedHiveFields = Set("year", "month", "day", "hour") + val blacklistedCapsuleFields = Set("schema", "seqId", "uuid", "userAgent", "clientValidated", + "isTruncated", "clientIp") + val legitCapsuleFields = Set("wiki", "webHost", "revision", "topic", "recvFrom") + + // Entry point + def main(args: Array[String]): Unit = { + val params = args.headOption match { + case Some("--options") => + // If job options are given as a single string. + // Split them before passing them to argsParser. + argsParser.parse(args(1).split("\\s+"), Params()).getOrElse(sys.exit(1)) + case _ => + argsParser.parse(args, Params()).getOrElse(sys.exit(1)) + } + + if (apply(params)) sys.exit(0) + else sys.exit(1) + } + + // This will be called after command line parameters have been parsed and checked. + def apply(params: Params): Boolean = { + + log.info(s"Starting process for ${params.database}_${params.table}.") + log.info(s"Querying Hive for intervals: " + Seq((params.startDate, params.endDate)).toString()) + + // Initialize sqlContext. + val sc = new SparkContext(new SparkConf()) + val hiveContext = new HiveContext(sc) + val sqlContext = hiveContext.asInstanceOf[SQLContext] + + // Get data already filtered by time range. + val comparisonFormat = "yyyyMMddHH" + val comparisonStartDate = params.startDate.toString(comparisonFormat) + val comparisonEndDate = params.endDate.toString(comparisonFormat) + val concatTimestamp = "CONCAT(year, LPAD(month, 2, '0'), LPAD(day, 2, '0'), LPAD(hour, 2, '0'))" + val df = sqlContext.sql(s""" + SELECT * + FROM ${params.database}.${params.table} + WHERE ${concatTimestamp} >= ${comparisonStartDate} + AND ${concatTimestamp} < ${comparisonEndDate} + """) + + log.info("Preparing dimensions and metrics.") + + // Flatten nested fields. + val flatColumns = getFlatColumns(df.schema) + val flatDf = df.select(flatColumns:_*) + + // Remove blacklisted fields. + val cleanColumns = getCleanColumns(flatDf.schema, params.blacklist) + val finalDf = flatDf.select(cleanColumns:_*) + + // Get dimensions and metrics. + val (dimensionFields, metricFields) = getDimensionsAndMetrics(finalDf.schema, params.metrics) + + log.info("Dimensions: " + dimensionFields.mkString(", ")) + log.info("Metrics: " + metricFields.mkString(", ")) + + if (params.dryRun) { + log.info("Dry run finished: no data was loaded.") + true + } else { + // Execute loading process. + log.info("Launching DataFrameToDruid process.") + val dftd = new DataFrameToDruid( + sc = sc, + dataSource = s"${params.database}_${params.table}", + inputDf = finalDf, + dimensions = dimensionFields, + metrics = metricFields, + intervals = Seq((params.startDate, params.endDate)), + timestampColumn = "dt", + timestampFormat = "auto", + segmentGranularity = params.segmentGranularity, + queryGranularity = params.queryGranularity, + numShards = params.numShards, + reduceMemory = params.reduceMemory, + hadoopQueue = params.hadoopQueue, + druidHost = params.druidHost, + druidPort = params.druidPort + ).start().await() + log.info("Done.") + + // Return whether the process was successful. + (dftd.status() == IngestionStatus.Done) + } + } + + def getFlatColumns(schema: StructType, prefix: String = null): Seq[Column] = { + // HACK: This map corrects casing for capsule fields, given that Hive kills camelCase. + val capsuleFields = legitCapsuleFields.union(blacklistedCapsuleFields) + val capsuleCaseMap = capsuleFields.map(f => (f.toLowerCase(), f)).toMap + + schema.fields.flatMap(field => { + val columnName = if (prefix == null) field.name else (prefix + "." + field.name) + val columnAlias = columnName.split("\\.").map(n => capsuleCaseMap.getOrElse(n, n)).mkString("_") + + field.dataType match { + case struct: StructType => getFlatColumns(struct, columnName) + case _ => Seq(col(columnName).as(columnAlias)) + } + }) + } + + def getCleanColumns(schema: StructType, blacklist: Seq[String]): Seq[Column] = { + val blacklistNames = blacklist.toSet + .union(blacklistedCapsuleFields) + .union(blacklistedHiveFields) + val fieldNames = schema.fields.map(f => f.name) + val withPrefix = "([^_]*)_.*".r + fieldNames.filter(f => ( + !blacklistNames.contains(f) && + (f match { + case withPrefix(prefix) => !blacklistNames.contains(prefix) + case _ => true + }) + )).map(col(_)) + } + + def getDimensionsAndMetrics( + schema: StructType, + metrics: (StructField) => Boolean + ): (Seq[String], Seq[String]) = { + val allFields = schema.fields.filter(f => f.name != "dt") + val metricFields = allFields + .filter((f) => !legitCapsuleFields.contains(f.name)) + .filter(metrics) + val dimensionFields = allFields.filter(!metricFields.contains(_)) + (dimensionFields.map(_.name), metricFields.map(_.name)) + } +} diff --git a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala new file mode 100644 index 0000000..d9bed98 --- /dev/null +++ b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala @@ -0,0 +1,104 @@ +package org.wikimedia.analytics.refinery.job + +import com.holdenkarau.spark.testing.SharedSparkContext +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.types.{StructType, StringType, IntegerType, LongType} +import org.scalatest.{FlatSpec, Matchers, BeforeAndAfterEach} + + +class TestEventLoggingToDruid extends FlatSpec + with Matchers with BeforeAndAfterEach with SharedSparkContext { + + var testDf: DataFrame = null.asInstanceOf[DataFrame] + + override def beforeEach(): Unit = { + val testSchema: StructType = (new StructType) + .add("event", (new StructType) + .add("pageId", StringType) + .add("action", StringType) + .add("seconds", IntegerType)) + .add("dt", StringType) + .add("webhost", StringType) + .add("useragent", StringType) + .add("year", IntegerType) + + val testRDD = sc.parallelize(Seq( + Row(Row("page1", "edit", 10), "2017-01-01T00:00:00", "en.wikimedia.org", "UA1", 2017), + Row(Row("page1", "read", 20), "2017-01-01T00:00:01", "en.wikimedia.org", "UA2", 2017), + Row(Row("page2", "edit", 30), "2017-01-01T00:00:02", "es.wikimedia.org", "UA3", 2017), + Row(Row("page3", "read", 40), "2017-01-01T00:00:03", "es.wikimedia.org", "UA4", 2017), + Row(Row("page3", "read", 50), "2017-01-01T00:00:04", "es.wikimedia.org", "UA5", 2017) + )) + + val sqlContext = new SQLContext(sc) + testDf = sqlContext.createDataFrame(testRDD, testSchema) + } + + + it should "flatten the event part of the schema" in { + val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema) + val flatFields = testDf.select(flatColumns:_*).schema.map(f => f.name) + + assert(flatFields.contains("event_pageId")) + assert(flatFields.contains("event_action")) + assert(flatFields.contains("event_seconds")) + } + + it should "correct case for the capsule fields" in { + val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema) + val flatFields = testDf.select(flatColumns:_*).schema.map(f => f.name) + + assert(flatFields.contains("webHost")) + assert(flatFields.contains("userAgent")) + } + + it should "blacklist whole columns properly" in { + val blacklist = Seq("event", "webHost") + val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema) + val flatDf = testDf.select(flatColumns:_*) + val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema, blacklist) + val cleanFields = flatDf.select(cleanColumns:_*).schema.map(f => f.name) + + assert(cleanFields.length == 1) + assert(cleanFields.contains("dt")) + } + + it should "blacklist struct column subfields properly" in { + val blacklist = Seq("event_pageId", "event_seconds") + val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema) + val flatDf = testDf.select(flatColumns:_*) + val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema, blacklist) + val cleanFields = flatDf.select(cleanColumns:_*).schema.map(f => f.name) + + assert(cleanFields.length == 3) + assert(cleanFields.contains("event_action")) + assert(cleanFields.contains("dt")) + assert(cleanFields.contains("webHost")) + } + + it should "select dimensions and metrics properly" in { + val blacklist = Seq("event_pageId") + val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema) + val flatDf = testDf.select(flatColumns:_*) + val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema, blacklist) + val cleanDf = flatDf.select(cleanColumns:_*) + + val (dimensionFields, metricFields) = EventLoggingToDruid.getDimensionsAndMetrics( + cleanDf.schema, + (f) => f.dataType match { + case IntegerType => true + case _ => false + } + ) + + assert(dimensionFields.length == 2) + assert(dimensionFields.contains("event_action")) + assert(dimensionFields.contains("webHost")) + + assert(metricFields.length == 1) + assert(metricFields.contains("event_seconds")) + } +} -- To view, visit https://gerrit.wikimedia.org/r/386882 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I30c374c3dcba44bbd0608e72ae0162bcc442cd0f Gerrit-PatchSet: 25 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Mforns <mfo...@wikimedia.org> Gerrit-Reviewer: Joal <j...@wikimedia.org> Gerrit-Reviewer: Mforns <mfo...@wikimedia.org> Gerrit-Reviewer: Milimetric <dandree...@wikimedia.org> Gerrit-Reviewer: Nuria <nu...@wikimedia.org> Gerrit-Reviewer: Ottomata <ao...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits