Repository: incubator-griffin Updated Branches: refs/heads/master cc35024a6 -> 791c502da
[GRIFFIN-205] accuracy matched fraction https://issues.apache.org/jira/browse/GRIFFIN-205 This pull request covers only batch dq type. We need to decide is it worth to add "matched fraction" to streaming type. Accuracy transformation tests added. Author: ashutak <ashu...@griddynamics.com> Closes #434 from ashutakGG/GRIFFIN-205-accuracy-matchedFraction. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/791c502d Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/791c502d Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/791c502d Branch: refs/heads/master Commit: 791c502dab1d30e3758bd85a35dff545b4ab74aa Parents: cc35024 Author: ashutak <ashu...@griddynamics.com> Authored: Sat Oct 13 18:30:48 2018 +0800 Committer: William Guo <gu...@apache.org> Committed: Sat Oct 13 18:30:48 2018 +0800 ---------------------------------------------------------------------- .../configuration/dqdefinition/DQConfig.scala | 16 +- .../dsl/transform/AccuracyExpr2DQSteps.scala | 17 +- .../src/test/resources/hive/person_table.csv | 2 + ...AccuracyTransformationsIntegrationTest.scala | 180 +++++++++++++++++++ 4 files changed, 202 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala index b281481..a4cdfc1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala @@ -74,9 +74,9 @@ case class DQConfig(@JsonProperty("name") private val name: String, */ @JsonInclude(Include.NON_NULL) case class DataSourceParam( @JsonProperty("name") private val name: String, - @JsonProperty("baseline") private val baseline: Boolean, @JsonProperty("connectors") private val connectors: List[DataConnectorParam], - @JsonProperty("checkpoint") private val checkpoint: Map[String, Any] + @JsonProperty("baseline") private val baseline: Boolean = false, + @JsonProperty("checkpoint") private val checkpoint: Map[String, Any] = null ) extends Param { def getName: String = name def isBaseline: Boolean = if (!baseline.equals(null)) baseline else false @@ -148,12 +148,12 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul @JsonInclude(Include.NON_NULL) case class RuleParam(@JsonProperty("dsl.type") private val dslType: String, @JsonProperty("dq.type") private val dqType: String, - @JsonProperty("in.dataframe.name") private val inDfName: String, - @JsonProperty("out.dataframe.name") private val outDfName: String, - @JsonProperty("rule") private val rule: String, - @JsonProperty("details") private val details: Map[String, Any], - @JsonProperty("cache") private val cache: Boolean, - @JsonProperty("out") private val outputs: List[RuleOutputParam] + @JsonProperty("in.dataframe.name") private val inDfName: String = null, + @JsonProperty("out.dataframe.name") private val outDfName: String = null, + @JsonProperty("rule") private val rule: String = null, + @JsonProperty("details") private val details: Map[String, Any] = null, + @JsonProperty("cache") private val cache: Boolean = false, + @JsonProperty("out") private val outputs: List[RuleOutputParam] = null ) extends Param { def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("") def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala index 3bf7d04..f7ff3ef 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala @@ -44,6 +44,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, val _miss = "miss" val _total = "total" val _matched = "matched" + val _matchedFraction = "matchedFraction" } import AccuracyKeys._ @@ -125,14 +126,20 @@ case class AccuracyExpr2DQSteps(context: DQContext, // 4. accuracy metric val accuracyTableName = ruleParam.getOutDfName() val matchedColName = details.getStringOrKey(_matched) + val matchedFractionColName = details.getStringOrKey(_matchedFraction) val accuracyMetricSql = procType match { case BatchProcessType => s""" - |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, - |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, - |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}` - |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` - """.stripMargin + SELECT A.total AS `${totalColName}`, + A.miss AS `${missColName}`, + (A.total - A.miss) AS `${matchedColName}`, + coalesce( (A.total - A.miss) / A.total, 1.0) AS `${matchedFractionColName}` + FROM ( + SELECT `${totalCountTableName}`.`${totalColName}` AS total, + coalesce(`${missCountTableName}`.`${missColName}`, 0) AS miss + FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` + ) AS A + """ case StreamingProcessType => s""" |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/test/resources/hive/person_table.csv ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/hive/person_table.csv b/measure/src/test/resources/hive/person_table.csv new file mode 100644 index 0000000..bde6b3e --- /dev/null +++ b/measure/src/test/resources/hive/person_table.csv @@ -0,0 +1,2 @@ +Joey,14 +Ivan,32 http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/791c502d/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala new file mode 100644 index 0000000..129f0c5 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala @@ -0,0 +1,180 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.transformations + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import org.apache.spark.sql.DataFrame +import org.scalatest._ + +import org.apache.griffin.measure.configuration.dqdefinition._ +import org.apache.griffin.measure.configuration.enums.BatchProcessType +import org.apache.griffin.measure.context.{ContextId, DQContext} +import org.apache.griffin.measure.datasource.DataSourceFactory +import org.apache.griffin.measure.job.builder.DQJobBuilder + +case class AccuracyResult(total: Long, miss: Long, matched: Long, matchedFraction: Double) + +class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with DataFrameSuiteBase { + import spark.implicits._ + + private val EMPTY_PERSON_TABLE = "empty_person" + private val PERSON_TABLE = "person" + + override def beforeAll(): Unit = { + super.beforeAll() + + dropTables() + createPersonTable() + createEmptyPersonTable() + + spark.conf.set("spark.sql.crossJoin.enabled", "true") + } + + override def afterAll(): Unit = { + dropTables() + super.afterAll() + } + + "accuracy" should "basically work" in { + checkAccuracy( + sourceName = PERSON_TABLE, + targetName = PERSON_TABLE, + expectedResult = AccuracyResult(total = 2, miss = 0, matched = 2, matchedFraction = 1.0)) + } + + "accuracy" should "work with empty target" in { + checkAccuracy( + sourceName = PERSON_TABLE, + targetName = EMPTY_PERSON_TABLE, + expectedResult = AccuracyResult(total = 2, miss = 2, matched = 0, matchedFraction = 0.0) + ) + } + + "accuracy" should "work with empty source" in { + checkAccuracy( + sourceName = EMPTY_PERSON_TABLE, + targetName = PERSON_TABLE, + expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0, matchedFraction = 1.0)) + } + + "accuracy" should "work with empty source and target" in { + checkAccuracy( + sourceName = EMPTY_PERSON_TABLE, + targetName = EMPTY_PERSON_TABLE, + expectedResult = AccuracyResult(total = 0, miss = 0, matched = 0, matchedFraction = 1.0)) + } + + private def checkAccuracy(sourceName: String, targetName: String, expectedResult: AccuracyResult) = { + val dqContext: DQContext = getDqContext( + dataSourcesParam = List( + DataSourceParam( + name = "source", + connectors = List(dataConnectorParam(tableName = sourceName)) + ), + DataSourceParam( + name = "target", + connectors = List(dataConnectorParam(tableName = targetName)) + ) + )) + + val accuracyRule = RuleParam( + dslType = "griffin-dsl", + dqType = "ACCURACY", + outDfName = "person_accuracy", + rule = "source.name = target.name" + ) + + val res = getRuleResults(dqContext, accuracyRule) + .as[AccuracyResult] + .collect() + + res.length shouldBe 1 + + res(0) shouldEqual expectedResult + } + + private def getRuleResults(dqContext: DQContext, rule: RuleParam): DataFrame = { + val dqJob = DQJobBuilder.buildDQJob( + dqContext, + evaluateRuleParam = EvaluateRuleParam(List(rule)) + ) + + dqJob.execute(dqContext) + + spark.sql(s"select * from ${rule.getOutDfName()}") + } + + private def createPersonTable(): Unit = { + val personCsvPath = getClass.getResource("/hive/person_table.csv").getFile + + spark.sql( + s"CREATE TABLE ${PERSON_TABLE} " + + "( " + + " name String," + + " age int " + + ") " + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' " + + "STORED AS TEXTFILE" + ) + + spark.sql(s"LOAD DATA LOCAL INPATH '$personCsvPath' OVERWRITE INTO TABLE ${PERSON_TABLE}") + } + + private def createEmptyPersonTable(): Unit = { + spark.sql( + s"CREATE TABLE ${EMPTY_PERSON_TABLE} " + + "( " + + " name String," + + " age int " + + ") " + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' " + + "STORED AS TEXTFILE" + ) + + spark.sql(s"select * from ${EMPTY_PERSON_TABLE}").show() + } + + private def dropTables(): Unit = { + spark.sql(s"DROP TABLE IF EXISTS ${PERSON_TABLE} ") + spark.sql(s"DROP TABLE IF EXISTS ${EMPTY_PERSON_TABLE} ") + } + + private def getDqContext(dataSourcesParam: Seq[DataSourceParam], name: String = "test-context"): DQContext = { + val dataSources = DataSourceFactory.getDataSources(spark, null, dataSourcesParam) + dataSources.foreach(_.init()) + + DQContext( + ContextId(System.currentTimeMillis), + name, + dataSources, + Nil, + BatchProcessType + )(spark) + } + + private def dataConnectorParam(tableName: String) = { + DataConnectorParam( + conType = "HIVE", + version = null, + dataFrameName = null, + config = Map("table.name" -> tableName), + preProc = null + ) + } +}