Repository: incubator-griffin Updated Branches: refs/heads/master fda8222c8 -> 23ff999cd
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala index dc9a3f8..86b367c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala @@ -44,15 +44,13 @@ object DataFrameOps { val _matched = "matched" } - def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { - val _dfName = "df.name" + def fromJson(sqlContext: SQLContext, inputDfName: String, details: Map[String, Any]): DataFrame = { val _colName = "col.name" - val dfName = details.getOrElse(_dfName, "").toString val colNameOpt = details.get(_colName).map(_.toString) implicit val encoder = Encoders.STRING - val df: DataFrame = sqlContext.table(s"`${dfName}`") + val df: DataFrame = sqlContext.table(s"`${inputDfName}`") val rdd = colNameOpt match { case Some(colName: String) => df.map(r => r.getAs[String](colName)) case _ => df.map(_.getAs[String](0)) @@ -60,19 +58,13 @@ object DataFrameOps { sqlContext.read.json(rdd) // slow process } - def accuracy(sqlContext: SQLContext, contextId: ContextId, details: Map[String, Any]): DataFrame = { + def accuracy(sqlContext: SQLContext, inputDfName: String, contextId: ContextId, details: Map[String, Any]): DataFrame = { import AccuracyOprKeys._ - val dfName = details.getStringOrKey(_dfName) val miss = details.getStringOrKey(_miss) val total = details.getStringOrKey(_total) val matched = details.getStringOrKey(_matched) -// val _enableIgnoreCache = "enable.ignore.cache" -// val enableIgnoreCache = details.getBoolean(_enableIgnoreCache, false) - -// val tmst = InternalColumns.tmst - val updateTime = new Date().getTime def getLong(r: Row, k: String): Option[Long] = { @@ -83,7 +75,7 @@ object DataFrameOps { } } - val df = sqlContext.table(s"`${dfName}`") + val df = sqlContext.table(s"`${inputDfName}`") val results = df.rdd.flatMap { row => try { @@ -122,11 +114,8 @@ object DataFrameOps { retDf } - def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { - val _dfName = "df.name" - val dfName = details.getOrElse(_dfName, "").toString - - val df = sqlContext.table(s"`${dfName}`") + def clear(sqlContext: SQLContext, inputDfName: String, details: Map[String, Any]): DataFrame = { + val df = sqlContext.table(s"`${inputDfName}`") val emptyRdd = sqlContext.sparkContext.emptyRDD[Row] sqlContext.createDataFrame(emptyRdd, df.schema) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala index e2f90f9..5f99ed2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala @@ -24,6 +24,7 @@ import org.apache.griffin.measure.context.DQContext * data frame ops transform step */ case class DataFrameOpsTransformStep(name: String, + inputDfName: String, rule: String, details: Map[String, Any], cache: Boolean = false @@ -33,9 +34,9 @@ case class DataFrameOpsTransformStep(name: String, val sqlContext = context.sqlContext try { val df = rule match { - case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, details) - case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, context.contextId, details) - case DataFrameOps._clear => DataFrameOps.clear(sqlContext, details) + case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, inputDfName, details) + case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details) + case DataFrameOps._clear => DataFrameOps.clear(sqlContext, inputDfName, details) case _ => throw new Exception(s"df opr [ ${rule} ] not supported") } if (cache) context.dataFrameCache.cacheDataFrame(name, df) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala index 0472416..9415998 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala @@ -33,7 +33,7 @@ case class DataSourceUpdateWriteStep(dsName: String, val writeTimestampOpt: Option[Long] = None def execute(context: DQContext): Boolean = { - collectDsCacheUpdateDf(context) match { + getDataSourceCacheUpdateDf(context) match { case Some(df) => { context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df)) } @@ -56,6 +56,6 @@ case class DataSourceUpdateWriteStep(dsName: String, } } - private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName) + private def getDataSourceCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala index 6b7944d..8f7d01c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala @@ -33,7 +33,7 @@ case class MetricFlushStep() extends WriteStep { context.metricWrapper.flush.foldLeft(true) { (ret, pair) => val (t, metric) = pair val pr = try { - context.getPersist(t).persistMetrics(metric) + context.getSink(t).sinkMetrics(metric) true } catch { case e: Throwable => { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala index 2f34d63..4771891 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala @@ -29,7 +29,7 @@ import org.apache.griffin.measure.utils.ParamUtil._ */ case class MetricWriteStep(name: String, inputName: String, - collectType: NormalizeType, + flattenType: FlattenType, writeTimestampOpt: Option[Long] = None ) extends WriteStep { @@ -46,7 +46,7 @@ case class MetricWriteStep(name: String, val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode) val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match { case SimpleMode => { - val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, collectType) + val metrics: Map[String, Any] = flattenMetric(metricMaps, name, flattenType) emptyMetricMap + (timestamp -> metrics) } case TimestampMode => { @@ -58,7 +58,7 @@ case class MetricWriteStep(name: String, tmstMetrics.groupBy(_._1).map { pair => val (k, v) = pair val maps = v.map(_._2) - val mtc = normalizeMetric(maps, name, collectType) + val mtc = flattenMetric(maps, name, flattenType) (k, mtc) } } @@ -95,12 +95,12 @@ case class MetricWriteStep(name: String, } } - private def normalizeMetric(metrics: Seq[Map[String, Any]], name: String, collectType: NormalizeType + private def flattenMetric(metrics: Seq[Map[String, Any]], name: String, flattenType: FlattenType ): Map[String, Any] = { - collectType match { - case EntriesNormalizeType => metrics.headOption.getOrElse(emptyMap) - case ArrayNormalizeType => Map[String, Any]((name -> metrics)) - case MapNormalizeType => { + flattenType match { + case EntriesFlattenType => metrics.headOption.getOrElse(emptyMap) + case ArrayFlattenType => Map[String, Any]((name -> metrics)) + case MapFlattenType => { val v = metrics.headOption.getOrElse(emptyMap) Map[String, Any]((name -> v)) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala index 13b7f80..2bc373c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ /** - * write records needs to be persisted + * write records needs to be sink */ case class RecordWriteStep(name: String, inputName: String, @@ -41,27 +41,27 @@ case class RecordWriteStep(name: String, writeMode match { case SimpleMode => { // batch records - val recordsOpt = collectBatchRecords(context) + val recordsOpt = getBatchRecords(context) // write records recordsOpt match { case Some(records) => { - context.getPersist(timestamp).persistRecords(records, name) + context.getSink(timestamp).sinkRecords(records, name) } case _ => {} } } case TimestampMode => { // streaming records - val (recordsOpt, emptyTimestamps) = collectStreamingRecords(context) + val (recordsOpt, emptyTimestamps) = getStreamingRecords(context) // write records recordsOpt.foreach { records => records.foreach { pair => val (t, strs) = pair - context.getPersist(t).persistRecords(strs, name) + context.getSink(t).sinkRecords(strs, name) } } emptyTimestamps.foreach { t => - context.getPersist(t).persistRecords(Nil, name) + context.getSink(t).sinkRecords(Nil, name) } } } @@ -92,11 +92,11 @@ case class RecordWriteStep(name: String, private def getFilterTableDataFrame(context: DQContext): Option[DataFrame] = filterTableNameOpt.flatMap(getDataFrame(context, _)) - private def collectBatchRecords(context: DQContext): Option[RDD[String]] = { + private def getBatchRecords(context: DQContext): Option[RDD[String]] = { getRecordDataFrame(context).map(_.toJSON.rdd); } - private def collectStreamingRecords(context: DQContext): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = { + private def getStreamingRecords(context: DQContext): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = { implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING) val defTimestamp = context.contextId.timestamp getRecordDataFrame(context) match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_accuracy-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json index 10167cd..149c839 100644 --- a/measure/src/test/resources/_accuracy-batch-griffindsl.json +++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json @@ -35,7 +35,7 @@ { "dsl.type": "griffin-dsl", "dq.type": "accuracy", - "name": "accu", + "out.dataframe.name": "accu", "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", "details": { "source": "source", @@ -44,13 +44,15 @@ "total": "total_count", "matched": "matched_count" }, - "metric": { - "name": "accu" - }, - "record": { - "name": "missRecords" - } + "out":[ + { + "type": "record", + "name": "missRecords" + } + ] } ] - } + }, + + "sinks": ["LOG", "ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_accuracy-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-sparksql.json b/measure/src/test/resources/_accuracy-batch-sparksql.json deleted file mode 100644 index 2eef9f1..0000000 --- a/measure/src/test/resources/_accuracy-batch-sparksql.json +++ /dev/null @@ -1,63 +0,0 @@ -{ - "name": "accu_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - }, { - "name": "target", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_target.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "missRecords", - "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.user_id, '') = coalesce(target.user_id, '') AND coalesce(source.first_name, '') = coalesce(target.first_name, '') AND coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT (source.user_id IS NULL AND source.first_name IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND target.post_code IS NULL)", - "record": { - "name": "miss" - } - }, - { - "dsl.type": "spark-sql", - "name": "miss_count", - "rule": "SELECT count(*) as miss FROM `missRecords`" - }, - { - "dsl.type": "spark-sql", - "name": "total_count", - "rule": "SELECT count(*) as total FROM source" - }, - { - "dsl.type": "spark-sql", - "name": "accu", - "rule": "SELECT `total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss`, (`total` - `miss`) AS `matched` FROM `total_count` FULL JOIN `miss_count`", - "metric": { - "name": "accu" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_accuracy-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json index 240d768..4492a35 100644 --- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json +++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json @@ -11,6 +11,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name": "this", "config": { "kafka.config": { "bootstrap.servers": "10.147.177.107:9092", @@ -24,22 +25,20 @@ }, "pre.proc": [ { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "dsl.type": "df-ops", + "in.dataframe.name": "this", + "out.dataframe.name": "s1", + "rule": "from_json" }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" + "out.dataframe.name": "this", + "rule": "select name, age from s1" } ] } ], - "cache": { + "checkpoint": { "type": "parquet", "file.path": "hdfs://localhost/griffin/streaming/dump/source", "info.path": "source", @@ -55,6 +54,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name": "this", "config": { "kafka.config": { "bootstrap.servers": "10.147.177.107:9092", @@ -68,22 +68,20 @@ }, "pre.proc": [ { - "dsl.type": "df-opr", - "name": "${t1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "dsl.type": "df-ops", + "in.dataframe.name": "this", + "out.dataframe.name": "t1", + "rule": "from_json" }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${t1}" + "out.dataframe.name": "this", + "rule": "select name, age from t1" } ] } ], - "cache": { + "checkpoint": { "type": "parquet", "file.path": "hdfs://localhost/griffin/streaming/dump/target", "info.path": "target", @@ -100,7 +98,7 @@ { "dsl.type": "griffin-dsl", "dq.type": "accuracy", - "name": "accu", + "out.dataframe.name": "accu", "rule": "source.name = target.name and source.age = target.age", "details": { "source": "source", @@ -109,13 +107,19 @@ "total": "total_count", "matched": "matched_count" }, - "metric": { - "name": "accu" - }, - "record": { - "name": "missRecords" - } + "out":[ + { + "type":"metric", + "name": "accu" + }, + { + "type":"record", + "name": "missRecords" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_accuracy-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-sparksql.json b/measure/src/test/resources/_accuracy-streaming-sparksql.json deleted file mode 100644 index 0824cb8..0000000 --- a/measure/src/test/resources/_accuracy-streaming-sparksql.json +++ /dev/null @@ -1,142 +0,0 @@ -{ - "name": "accu_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-2m", "0"] - } - }, { - "name": "target", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "ttt", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${t1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${t1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/target", - "info.path": "target", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-2m", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "missRecords", - "cache": true, - "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.name, '') = coalesce(target.name, '') AND coalesce(source.age, '') = coalesce(target.age, '') WHERE (NOT (source.name IS NULL AND source.age IS NULL)) AND (target.name IS NULL AND target.age IS NULL)" - }, - { - "dsl.type": "spark-sql", - "name": "miss_count", - "rule": "SELECT `__tmst`, count(*) as miss FROM `missRecords` GROUP BY `__tmst`" - }, - { - "dsl.type": "spark-sql", - "name": "total_count", - "rule": "SELECT `__tmst`, count(*) as total FROM source GROUP BY `__tmst`" - }, - { - "dsl.type": "spark-sql", - "name": "accu", - "rule": "SELECT `total_count`.`__tmst` AS `__tmst`, `total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss` FROM `total_count` FULL JOIN `miss_count` ON `total_count`.`__tmst` = `miss_count`.`__tmst`" - }, - { - "dsl.type": "df-opr", - "name": "metric_accu", - "rule": "accuracy", - "details": { - "df.name": "accu", - "miss": "miss", - "total": "total", - "matched": "matched" - }, - "metric": { - "name": "accuracy" - } - }, - { - "dsl.type": "spark-sql", - "name": "accu_miss_records", - "rule": "SELECT `__tmst`, `__empty` FROM `metric_accu` WHERE `__record`", - "record": { - "name": "missRecords", - "data.source.cache": "source", - "origin.DF": "missRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_completeness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json index 9c00444..4f42092 100644 --- a/measure/src/test/resources/_completeness-batch-griffindsl.json +++ b/measure/src/test/resources/_completeness-batch-griffindsl.json @@ -25,12 +25,17 @@ { "dsl.type": "griffin-dsl", "dq.type": "completeness", - "name": "comp", + "out.dataframe.name": "comp", "rule": "email, post_code, first_name", - "metric": { - "name": "comp" - } + "out":[ + { + "type": "metric", + "name": "comp" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_completeness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json index ba8bdce..02f0a39 100644 --- a/measure/src/test/resources/_completeness-streaming-griffindsl.json +++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json @@ -10,6 +10,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name": "this", "config": { "kafka.config": { "bootstrap.servers": "10.147.177.107:9092", @@ -23,22 +24,20 @@ }, "pre.proc": [ { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "dsl.type": "df-ops", + "in.dataframe.name": "this", + "out.dataframe.name": "s1", + "rule": "from_json" }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" + "out.dataframe.name": "this", + "rule": "select name, age from s1" } ] } ], - "cache": { + "checkpoint": { "file.path": "hdfs://localhost/griffin/streaming/dump/source", "info.path": "source", "ready.time.interval": "10s", @@ -54,12 +53,17 @@ { "dsl.type": "griffin-dsl", "dq.type": "completeness", - "name": "comp", + "out.dataframe.name": "comp", "rule": "name, age", - "metric": { - "name": "comp" - } + "out":[ + { + "type": "metric", + "name": "comp" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_distinctness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json index af0c91e..d946089 100644 --- a/measure/src/test/resources/_distinctness-batch-griffindsl.json +++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json @@ -37,7 +37,7 @@ { "dsl.type": "griffin-dsl", "dq.type": "distinct", - "name": "dist", + "out.dataframe.name": "dist", "rule": "user_id", "details": { "source": "source", @@ -48,10 +48,15 @@ "num": "num", "duplication.array": "dup" }, - "metric": { - "name": "distinct" - } + "out":[ + { + "type": "metric", + "name": "distinct" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_distinctness-batch-griffindsl1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json b/measure/src/test/resources/_distinctness-batch-griffindsl1.json deleted file mode 100644 index 4d94d8e..0000000 --- a/measure/src/test/resources/_distinctness-batch-griffindsl1.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "name": "dist_batch", - - "process.type": "batch", - - "timestamp": 123456, - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/dupdata.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${this}" - } - ] - } - ] - }, - { - "name": "target", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/dupdata.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select DISTINCT name, age from ${this}" - } - ] - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "distinct", - "name": "dist", - "rule": "name", - "details": { - "source": "source", - "target": "target", - "total": "total", - "distinct": "distinct", - "dup": "dup", - "num": "num", - "duplication.array": "dup" - }, - "metric": { - "name": "distinct" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_distinctness-batch-griffindsl2.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl2.json b/measure/src/test/resources/_distinctness-batch-griffindsl2.json deleted file mode 100644 index 6a12719..0000000 --- a/measure/src/test/resources/_distinctness-batch-griffindsl2.json +++ /dev/null @@ -1,74 +0,0 @@ -{ - "name": "dist_batch", - - "process.type": "batch", - - "timestamp": 123456, - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/dupdata.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${this}" - } - ] - } - ] - }, - { - "name": "target", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/dupdata.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select DISTINCT name, age from ${this}" - } - ] - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "distinct", - "name": "dist", - "rule": "name, [age]", - "details": { - "source": "source", - "target": "target", - "total": "total", - "distinct": "distinct", - "dup": "dup", - "num": "num", - "duplication.array": "dup", - "record.enable": true - }, - "metric": { - "name": "distinct" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_distinctness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json b/measure/src/test/resources/_distinctness-streaming-griffindsl.json index c36e7ba..e3629d1 100644 --- a/measure/src/test/resources/_distinctness-streaming-griffindsl.json +++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json @@ -6,7 +6,7 @@ "data.sources": [ { "name": "new", - "cache": { + "checkpoint": { "file.path": "hdfs://localhost/griffin/streaming/dump/old", "info.path": "new", "ready.time.interval": "10s", @@ -21,6 +21,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name": "this", "config": { "kafka.config": { "bootstrap.servers": "10.149.247.156:9092", @@ -34,22 +35,20 @@ }, "pre.proc": [ { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "dsl.type": "df-ops", + "in.dataframe.name": "this", + "out.dataframe.name": "s1", + "rule": "from_json" }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" + "out.dataframe.name": "this", + "rule": "select name, age from s1" } ] } ], - "cache": { + "checkpoint": { "file.path": "hdfs://localhost/griffin/streaming/dump/old", "info.path": "old", "ready.time.interval": "10s", @@ -64,7 +63,7 @@ { "dsl.type": "griffin-dsl", "dq.type": "distinct", - "name": "dist", + "out.dataframe.name": "dist", "rule": "name, age", "details": { "source": "new", @@ -76,10 +75,15 @@ "num": "num", "duplication.array": "dup" }, - "metric": { - "name": "distinct" - } + "out":[ + { + "type": "metric", + "name": "distinct" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-batch-griffindsl-hive.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json index 03b0405..70cc369 100644 --- a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json +++ b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json @@ -26,23 +26,31 @@ { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "name": "prof", + "out.dataframe.name": "prof", "rule": "name, count(*) as cnt from source group by name", - "metric": { - "name": "name_group", - "collect.type": "array" - } + "out":[ + { + "type": "metric", + "name": "name_group", + "flatten": "array" + } + ] }, { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "name": "grp", + "out.dataframe.name": "grp", "rule": "age, count(*) as cnt from source group by age order by cnt", - "metric": { - "name": "age_group", - "collect.type": "array" - } + "out":[ + { + "type": "metric", + "name": "age_group", + "flatten": "array" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json index ec082c4..273d2e4 100644 --- a/measure/src/test/resources/_profiling-batch-griffindsl.json +++ b/measure/src/test/resources/_profiling-batch-griffindsl.json @@ -12,14 +12,14 @@ { "type": "avro", "version": "1.7", + "dataframe.name" : "this_table", "config": { "file.name": "src/test/resources/users_info_src.avro" }, "pre.proc": [ { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select reg_replace(email, '^([^@0-9]+)([0-9]+)@(dc)(?:\\\\.[^@]+)$', '$1@$3') as email, post_code from ${this}" + "rule": "select * from this_table where user_id < 10014" } ] } @@ -32,23 +32,31 @@ { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "name": "prof", - "rule": "email, count(*) as cnt from source group by email", - "metric": { - "name": "prof", - "collect.type": "array" - } + "out.dataframe.name": "prof", + "rule": "user_id, count(*) as cnt from source group by user_id", + "out":[ + { + "type": "metric", + "name": "prof", + "flatten": "array" + } + ] }, { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "name": "grp", + "out.dataframe.name": "grp", "rule": "source.post_code, count(*) as cnt from source group by source.post_code order by cnt desc", - "metric": { - "name": "post_group", - "collect.type": "array" - } + "out":[ + { + "type": "metric", + "name": "post_group", + "flatten": "array" + } + ] } ] - } + }, + + "sinks": ["CONSOLE"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json b/measure/src/test/resources/_profiling-batch-sparksql.json index fdfd812..c8077d0 100644 --- a/measure/src/test/resources/_profiling-batch-sparksql.json +++ b/measure/src/test/resources/_profiling-batch-sparksql.json @@ -24,21 +24,29 @@ "rules": [ { "dsl.type": "spark-sql", - "name": "prof", + "out.dataframe.name": "prof", "rule": "select count(*) as `cnt`, count(distinct `post_code`) as `dis-cnt`, max(user_id) as `max` from source", - "metric": { - "name": "prof" - } + "out": [ + { + "type": "metric", + "name": "prof" + } + ] }, { "dsl.type": "spark-sql", - "name": "grp", + "out.dataframe.name": "grp", "rule": "select post_code as `pc`, count(*) as `cnt` from source group by post_code", - "metric": { - "name": "post_group", - "collect.type": "array" - } + "out": [ + { + "type": "metric", + "name": "post_group", + "flatten": "array" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json index b6feb5a..a523434 100644 --- a/measure/src/test/resources/_profiling-streaming-griffindsl.json +++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json @@ -10,6 +10,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name": "this", "config": { "kafka.config": { "bootstrap.servers": "10.147.177.107:9092", @@ -23,22 +24,20 @@ }, "pre.proc": [ { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "dsl.type": "df-ops", + "in.dataframe.name": "this", + "out.dataframe.name": "s1", + "rule": "from_json" }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" + "out.dataframe.name": "this", + "rule": "select name, age from s1" } ] } ], - "cache": { + "checkpoint": { "file.path": "hdfs://localhost/griffin/streaming/dump/source", "info.path": "source", "ready.time.interval": "10s", @@ -54,22 +53,30 @@ { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "name": "prof", + "out.dataframe.name": "prof", "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", - "metric": { - "name": "prof" - } + "out":[ + { + "type": "metric", + "name": "prof" + } + ] }, { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "name": "grp", + "out.dataframe.name": "grp", "rule": "select name, count(*) as `cnt` from source group by name", - "metric": { - "name": "name_group", - "collect.type": "array" - } + "out":[ + { + "type": "metric", + "name": "name_group", + "flatten": "array" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-streaming-sparksql.json b/measure/src/test/resources/_profiling-streaming-sparksql.json deleted file mode 100644 index 4f0b0ee..0000000 --- a/measure/src/test/resources/_profiling-streaming-sparksql.json +++ /dev/null @@ -1,80 +0,0 @@ -{ - "name": "prof_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "prof", - "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", - "metric": { - "name": "prof" - } - }, - { - "dsl.type": "spark-sql", - "name": "grp", - "rule": "select name, count(*) as `cnt` from source group by name", - "metric": { - "name": "name_group", - "collect.type": "array" - } - }, - { - "dsl.type": "spark-sql", - "name": "tmst_grp", - "rule": "select `__tmst`, count(*) as `cnt` from source group by `__tmst`", - "metric": { - "name": "tmst_group" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_timeliness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json index 90439df..f3759ca 100644 --- a/measure/src/test/resources/_timeliness-batch-griffindsl.json +++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json @@ -23,7 +23,7 @@ { "dsl.type": "griffin-dsl", "dq.type": "timeliness", - "name": "timeliness", + "out.dataframe.name": "timeliness", "rule": "ts, end_ts", "details": { "source": "source", @@ -37,13 +37,19 @@ "percentile": "percentile", "percentile.values": [0.95] }, - "metric": { - "name": "timeliness" - }, - "record": { - "name": "lateRecords" - } + "out":[ + { + "type": "metric", + "name": "timeliness" + }, + { + "type": "record", + "name": "lateRecords" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_timeliness-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-batch-sparksql.json b/measure/src/test/resources/_timeliness-batch-sparksql.json deleted file mode 100644 index f9cb368..0000000 --- a/measure/src/test/resources/_timeliness-batch-sparksql.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "name": "timeliness_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/timeliness_data.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "in_time", - "rule": "select *, (ts) as `_in_ts`, (end_ts) as `_out_ts` from source where (ts) IS NOT NULL" - }, - { - "dsl.type": "spark-sql", - "name": "lat", - "cache": true, - "rule": "select *, (`_out_ts` - `_in_ts`) as `latency` from `in_time`" - }, - { - "dsl.type": "spark-sql", - "name": "metric", - "rule": "select cast(avg(`latency`) as bigint) as `avg`, max(`latency`) as `max`, min(`latency`) as `min` from `lat`", - "metric": { - "name": "timeliness" - } - }, - { - "dsl.type": "spark-sql", - "name": "slows", - "rule": "select * from `lat` where `latency` > 60000", - "record": { - "name": "lateRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_timeliness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json index 5916e5c..1663122 100644 --- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json +++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json @@ -10,6 +10,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name": "this", "config": { "kafka.config": { "bootstrap.servers": "10.149.247.156:9092", @@ -23,22 +24,20 @@ }, "pre.proc": [ { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "dsl.type": "df-ops", + "in.dataframe.name": "this", + "out.dataframe.name": "s1", + "rule": "from_json" }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select ts, end_ts, name, age from ${s1}" + "out.dataframe.name": "this", + "rule": "select name, age from s1" } ] } ], - "cache": { + "checkpoint": { "file.path": "hdfs://localhost/griffin/streaming/dump/source", "info.path": "source", "ready.time.interval": "10s", @@ -53,7 +52,7 @@ { "dsl.type": "griffin-dsl", "dq.type": "timeliness", - "name": "timeliness", + "out.dataframe.name": "timeliness", "rule": "ts, end_ts", "details": { "source": "source", @@ -67,13 +66,19 @@ "percentile": "percentile", "percentile.values": [0.2, 0.5, 0.8] }, - "metric": { - "name": "timeliness" - }, - "record": { - "name": "lateRecords" - } + "out":[ + { + "type": "metric", + "name": "timeliness" + }, + { + "type": "record", + "name": "lateRecords" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_timeliness-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-streaming-sparksql.json b/measure/src/test/resources/_timeliness-streaming-sparksql.json deleted file mode 100644 index dc736ab..0000000 --- a/measure/src/test/resources/_timeliness-streaming-sparksql.json +++ /dev/null @@ -1,82 +0,0 @@ -{ - "name": "timeliness_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "fff", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select ts, name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "in_time", - "rule": "select *, (ts) as `_in_ts` from source where (ts) IS NOT NULL" - }, - { - "dsl.type": "spark-sql", - "name": "lat", - "cache": true, - "rule": "select *, (`__tmst` - `_in_ts`) as `latency` from `in_time`" - }, - { - "dsl.type": "spark-sql", - "name": "metric", - "rule": "select `__tmst`, cast(avg(`latency`) as bigint) as `avg`, max(`latency`) as `max`, min(`latency`) as `min` from `lat`", - "metric": { - "name": "timeliness" - } - }, - { - "dsl.type": "spark-sql", - "name": "slows", - "rule": "select * from `lat` where `latency` > 60000", - "record": { - "name": "lateRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_uniqueness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json b/measure/src/test/resources/_uniqueness-batch-griffindsl.json index 28009e8..2c32930 100644 --- a/measure/src/test/resources/_uniqueness-batch-griffindsl.json +++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json @@ -36,7 +36,7 @@ { "dsl.type": "griffin-dsl", "dq.type": "uniqueness", - "name": "dup", + "out.dataframe.name": "dup", "rule": "user_id", "details": { "source": "source", @@ -46,13 +46,19 @@ "dup": "dup", "num": "num" }, - "metric": { - "name": "unique" - }, - "record": { - "name": "dupRecords" - } + "out":[ + { + "type": "metric", + "name": "unique" + }, + { + "type": "record", + "name": "dupRecords" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_uniqueness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json index bc5cbd2..a4f4dcc 100644 --- a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json +++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json @@ -11,6 +11,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name": "this", "config": { "kafka.config": { "bootstrap.servers": "10.149.247.156:9092", @@ -24,22 +25,20 @@ }, "pre.proc": [ { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "dsl.type": "df-ops", + "in.dataframe.name": "this", + "out.dataframe.name": "s1", + "rule": "from_json" }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" + "out.dataframe.name": "this", + "rule": "select name, age from s1" } ] } ], - "cache": { + "checkpoint": { "file.path": "hdfs://localhost/griffin/streaming/dump/new", "info.path": "new", "ready.time.interval": "10s", @@ -53,6 +52,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name": "this", "config": { "kafka.config": { "bootstrap.servers": "10.149.247.156:9092", @@ -66,22 +66,20 @@ }, "pre.proc": [ { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "dsl.type": "df-ops", + "in.dataframe.name": "this", + "out.dataframe.name": "s1", + "rule": "from_json" }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" + "out.dataframe.name": "this", + "rule": "select name, age from s1" } ] } ], - "cache": { + "checkpoint": { "file.path": "hdfs://localhost/griffin/streaming/dump/old", "info.path": "old", "ready.time.interval": "10s", @@ -96,7 +94,7 @@ { "dsl.type": "griffin-dsl", "dq.type": "uniqueness", - "name": "dup", + "out.dataframe.name": "dup", "rule": "name, age", "details": { "source": "new", @@ -107,13 +105,19 @@ "num": "num", "duplication.array": "dup" }, - "metric": { - "name": "unique" - }, - "record": { - "name": "dupRecords" - } + "out":[ + { + "type": "metric", + "name": "unique" + }, + { + "type": "record", + "name": "dupRecords" + } + ] } ] - } + }, + + "sinks": ["CONSOLE","ELASTICSEARCH"] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_uniqueness-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-streaming-sparksql.json b/measure/src/test/resources/_uniqueness-streaming-sparksql.json deleted file mode 100644 index 7d13215..0000000 --- a/measure/src/test/resources/_uniqueness-streaming-sparksql.json +++ /dev/null @@ -1,130 +0,0 @@ -{ - "name": "unique_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "new", - "baseline": true, - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "new", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/new", - "info.path": "new", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"] - } - }, - { - "name": "old", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "old", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/old", - "info.path": "old", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-24h", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "dist", - "rule": "SELECT DISTINCT * FROM new" - }, - { - "dsl.type": "spark-sql", - "name": "joined", - "rule": "SELECT dist.* FROM old RIGHT JOIN dist ON coalesce(old.name, '') = coalesce(dist.name, '') AND coalesce(old.age, '') = coalesce(dist.age, '')" - }, - { - "dsl.type": "spark-sql", - "name": "grouped", - "rule": "SELECT `__tmst`, `name`, `age`, count(*) as `dup_cnt` FROM joined GROUP BY `__tmst`, `name`, `age`" - }, - { - "dsl.type": "spark-sql", - "name": "dupRecs", - "cache": true, - "rule": "SELECT * FROM grouped WHERE `dup_cnt` > 1", - "record": { - "name": "dupRecords" - } - }, - { - "dsl.type": "spark-sql", - "name": "dupMetric", - "rule": "SELECT `__tmst`, `dup_cnt`, count(*) as `item_cnt` FROM dupRecs GROUP BY `__tmst`, `dup_cnt`", - "metric": { - "name": "dup" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/env-batch.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-batch.json b/measure/src/test/resources/env-batch.json index 0d6ea8a..3e1f7a6 100644 --- a/measure/src/test/resources/env-batch.json +++ b/measure/src/test/resources/env-batch.json @@ -6,24 +6,14 @@ } }, - "persist": [ + "sinks": [ { - "type": "log", + "type": "console", "config": { "max.log.lines": 10 } - }, - { - "type": "hdfs", - "config": { - "path": "hdfs://localhost/griffin/batch/persist", - "max.persist.lines": 10000, - "max.lines.per.file": 10000 - } } ], - "info.cache": [], - - "cleaner": {} + "griffin.checkpoint": [] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/env-streaming-mongo.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-streaming-mongo.json b/measure/src/test/resources/env-streaming-mongo.json index 0d50462..ef10aef 100644 --- a/measure/src/test/resources/env-streaming-mongo.json +++ b/measure/src/test/resources/env-streaming-mongo.json @@ -17,9 +17,9 @@ } }, - "persist": [ + "sinks": [ { - "type": "log", + "type": "console", "config": { "max.log.lines": 100 } @@ -34,7 +34,7 @@ } ], - "info.cache": [ + "griffin.checkpoint": [ { "type": "zk", "config": { @@ -46,9 +46,5 @@ "close.clear": false } } - ], - - "cleaner": { - "clean.interval": "2m" - } + ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/env-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json index 08dd7ee..34d469f 100644 --- a/measure/src/test/resources/env-streaming.json +++ b/measure/src/test/resources/env-streaming.json @@ -18,16 +18,16 @@ } }, - "persist": [ + "sinks": [ { - "type": "log", + "type": "console", "config": { "max.log.lines": 100 } } ], - "info.cache": [ + "griffin.checkpoint": [ { "type": "zk", "config": { @@ -39,9 +39,5 @@ "close.clear": false } } - ], - - "cleaner": { - "clean.interval": "2m" - } + ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala index f5b404e..bb75cec 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala @@ -28,12 +28,12 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{ "params " should "be parsed from a valid file" in { - val reader :ParamReader = ParamFileReader(getClass.getResource("/_accuracy-batch-sparksql.json").getFile) + val reader :ParamReader = ParamFileReader(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile) val params = reader.readConfig[DQConfig] params match { case Success(v) => - v.evaluateRule.getRules(0).dslType should === ("spark-sql") - v.evaluateRule.getRules(0).name should === ("missRecords") + v.evaluateRule.getRules(0).dslType should === ("griffin-dsl") + v.evaluateRule.getRules(0).outDfName should === ("accu") case Failure(_) => fail("it should not happen") } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala index 0f08cdc..1e9f3b0 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala @@ -28,7 +28,7 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{ "params " should "be parsed from a valid file" in { - val bufferedSource = Source.fromFile(getClass.getResource("/_accuracy-batch-sparksql.json").getFile) + val bufferedSource = Source.fromFile(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile) val jsonString = bufferedSource.getLines().mkString bufferedSource.close @@ -36,8 +36,8 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{ val params = reader.readConfig[DQConfig] params match { case Success(v) => - v.evaluateRule.getRules(0).dslType should === ("spark-sql") - v.evaluateRule.getRules(0).name should === ("missRecords") + v.evaluateRule.getRules(0).dslType should === ("griffin-dsl") + v.evaluateRule.getRules(0).outDfName should === ("accu") case Failure(_) => fail("it should not happen") }