Repository: incubator-griffin
Updated Branches:
  refs/heads/master fda8222c8 -> 23ff999cd


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
index dc9a3f8..86b367c 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
@@ -44,15 +44,13 @@ object DataFrameOps {
     val _matched = "matched"
   }
 
-  def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = 
{
-    val _dfName = "df.name"
+  def fromJson(sqlContext: SQLContext, inputDfName: String, details: 
Map[String, Any]): DataFrame = {
     val _colName = "col.name"
-    val dfName = details.getOrElse(_dfName, "").toString
     val colNameOpt = details.get(_colName).map(_.toString)
 
     implicit val encoder = Encoders.STRING
 
-    val df: DataFrame = sqlContext.table(s"`${dfName}`")
+    val df: DataFrame = sqlContext.table(s"`${inputDfName}`")
     val rdd = colNameOpt match {
       case Some(colName: String) => df.map(r => r.getAs[String](colName))
       case _ => df.map(_.getAs[String](0))
@@ -60,19 +58,13 @@ object DataFrameOps {
     sqlContext.read.json(rdd) // slow process
   }
 
-  def accuracy(sqlContext: SQLContext, contextId: ContextId, details: 
Map[String, Any]): DataFrame = {
+  def accuracy(sqlContext: SQLContext, inputDfName: String, contextId: 
ContextId, details: Map[String, Any]): DataFrame = {
     import AccuracyOprKeys._
 
-    val dfName = details.getStringOrKey(_dfName)
     val miss = details.getStringOrKey(_miss)
     val total = details.getStringOrKey(_total)
     val matched = details.getStringOrKey(_matched)
 
-//    val _enableIgnoreCache = "enable.ignore.cache"
-//    val enableIgnoreCache = details.getBoolean(_enableIgnoreCache, false)
-
-//    val tmst = InternalColumns.tmst
-
     val updateTime = new Date().getTime
 
     def getLong(r: Row, k: String): Option[Long] = {
@@ -83,7 +75,7 @@ object DataFrameOps {
       }
     }
 
-    val df = sqlContext.table(s"`${dfName}`")
+    val df = sqlContext.table(s"`${inputDfName}`")
 
     val results = df.rdd.flatMap { row =>
       try {
@@ -122,11 +114,8 @@ object DataFrameOps {
     retDf
   }
 
-  def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = {
-    val _dfName = "df.name"
-    val dfName = details.getOrElse(_dfName, "").toString
-
-    val df = sqlContext.table(s"`${dfName}`")
+  def clear(sqlContext: SQLContext, inputDfName: String, details: Map[String, 
Any]): DataFrame = {
+    val df = sqlContext.table(s"`${inputDfName}`")
     val emptyRdd = sqlContext.sparkContext.emptyRDD[Row]
     sqlContext.createDataFrame(emptyRdd, df.schema)
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index e2f90f9..5f99ed2 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -24,6 +24,7 @@ import org.apache.griffin.measure.context.DQContext
   * data frame ops transform step
   */
 case class DataFrameOpsTransformStep(name: String,
+                                     inputDfName: String,
                                      rule: String,
                                      details: Map[String, Any],
                                      cache: Boolean = false
@@ -33,9 +34,9 @@ case class DataFrameOpsTransformStep(name: String,
     val sqlContext = context.sqlContext
     try {
       val df = rule match {
-        case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, 
details)
-        case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, 
context.contextId, details)
-        case DataFrameOps._clear => DataFrameOps.clear(sqlContext, details)
+        case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, 
inputDfName, details)
+        case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, 
inputDfName, context.contextId, details)
+        case DataFrameOps._clear => DataFrameOps.clear(sqlContext, 
inputDfName, details)
         case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
       }
       if (cache) context.dataFrameCache.cacheDataFrame(name, df)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index 0472416..9415998 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -33,7 +33,7 @@ case class DataSourceUpdateWriteStep(dsName: String,
   val writeTimestampOpt: Option[Long] = None
 
   def execute(context: DQContext): Boolean = {
-    collectDsCacheUpdateDf(context) match {
+    getDataSourceCacheUpdateDf(context) match {
       case Some(df) => {
         context.dataSources.find(ds => StringUtils.equals(ds.name, 
dsName)).foreach(_.updateData(df))
       }
@@ -56,6 +56,6 @@ case class DataSourceUpdateWriteStep(dsName: String,
     }
   }
 
-  private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = 
getDataFrame(context, inputName)
+  private def getDataSourceCacheUpdateDf(context: DQContext): 
Option[DataFrame] = getDataFrame(context, inputName)
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 6b7944d..8f7d01c 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -33,7 +33,7 @@ case class MetricFlushStep() extends WriteStep {
     context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
       val (t, metric) = pair
       val pr = try {
-        context.getPersist(t).persistMetrics(metric)
+        context.getSink(t).sinkMetrics(metric)
         true
       } catch {
         case e: Throwable => {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index 2f34d63..4771891 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -29,7 +29,7 @@ import org.apache.griffin.measure.utils.ParamUtil._
   */
 case class MetricWriteStep(name: String,
                            inputName: String,
-                           collectType: NormalizeType,
+                           flattenType: FlattenType,
                            writeTimestampOpt: Option[Long] = None
                           ) extends WriteStep {
 
@@ -46,7 +46,7 @@ case class MetricWriteStep(name: String,
     val writeMode = writeTimestampOpt.map(_ => 
SimpleMode).getOrElse(context.writeMode)
     val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match {
       case SimpleMode => {
-        val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, 
collectType)
+        val metrics: Map[String, Any] = flattenMetric(metricMaps, name, 
flattenType)
         emptyMetricMap + (timestamp -> metrics)
       }
       case TimestampMode => {
@@ -58,7 +58,7 @@ case class MetricWriteStep(name: String,
         tmstMetrics.groupBy(_._1).map { pair =>
           val (k, v) = pair
           val maps = v.map(_._2)
-          val mtc = normalizeMetric(maps, name, collectType)
+          val mtc = flattenMetric(maps, name, flattenType)
           (k, mtc)
         }
       }
@@ -95,12 +95,12 @@ case class MetricWriteStep(name: String,
     }
   }
 
-  private def normalizeMetric(metrics: Seq[Map[String, Any]], name: String, 
collectType: NormalizeType
+  private def flattenMetric(metrics: Seq[Map[String, Any]], name: String, 
flattenType: FlattenType
                              ): Map[String, Any] = {
-    collectType match {
-      case EntriesNormalizeType => metrics.headOption.getOrElse(emptyMap)
-      case ArrayNormalizeType => Map[String, Any]((name -> metrics))
-      case MapNormalizeType => {
+    flattenType match {
+      case EntriesFlattenType => metrics.headOption.getOrElse(emptyMap)
+      case ArrayFlattenType => Map[String, Any]((name -> metrics))
+      case MapFlattenType => {
         val v = metrics.headOption.getOrElse(emptyMap)
         Map[String, Any]((name -> v))
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 13b7f80..2bc373c 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 
 /**
-  * write records needs to be persisted
+  * write records needs to be sink
   */
 case class RecordWriteStep(name: String,
                            inputName: String,
@@ -41,27 +41,27 @@ case class RecordWriteStep(name: String,
     writeMode match {
       case SimpleMode => {
         // batch records
-        val recordsOpt = collectBatchRecords(context)
+        val recordsOpt = getBatchRecords(context)
         // write records
         recordsOpt match {
           case Some(records) => {
-            context.getPersist(timestamp).persistRecords(records, name)
+            context.getSink(timestamp).sinkRecords(records, name)
           }
           case _ => {}
         }
       }
       case TimestampMode => {
         // streaming records
-        val (recordsOpt, emptyTimestamps) = collectStreamingRecords(context)
+        val (recordsOpt, emptyTimestamps) = getStreamingRecords(context)
         // write records
         recordsOpt.foreach { records =>
           records.foreach { pair =>
             val (t, strs) = pair
-            context.getPersist(t).persistRecords(strs, name)
+            context.getSink(t).sinkRecords(strs, name)
           }
         }
         emptyTimestamps.foreach { t =>
-          context.getPersist(t).persistRecords(Nil, name)
+          context.getSink(t).sinkRecords(Nil, name)
         }
       }
     }
@@ -92,11 +92,11 @@ case class RecordWriteStep(name: String,
 
   private def getFilterTableDataFrame(context: DQContext): Option[DataFrame] = 
filterTableNameOpt.flatMap(getDataFrame(context, _))
 
-  private def collectBatchRecords(context: DQContext): Option[RDD[String]] = {
+  private def getBatchRecords(context: DQContext): Option[RDD[String]] = {
     getRecordDataFrame(context).map(_.toJSON.rdd);
   }
 
-  private def collectStreamingRecords(context: DQContext): (Option[RDD[(Long, 
Iterable[String])]], Set[Long]) = {
+  private def getStreamingRecords(context: DQContext): (Option[RDD[(Long, 
Iterable[String])]], Set[Long]) = {
     implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
     val defTimestamp = context.contextId.timestamp
     getRecordDataFrame(context) match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_accuracy-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json 
b/measure/src/test/resources/_accuracy-batch-griffindsl.json
index 10167cd..149c839 100644
--- a/measure/src/test/resources/_accuracy-batch-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json
@@ -35,7 +35,7 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "accuracy",
-        "name": "accu",
+        "out.dataframe.name": "accu",
         "rule": "source.user_id = target.user_id AND upper(source.first_name) 
= upper(target.first_name) AND source.last_name = target.last_name AND 
source.address = target.address AND source.email = target.email AND 
source.phone = target.phone AND source.post_code = target.post_code",
         "details": {
           "source": "source",
@@ -44,13 +44,15 @@
           "total": "total_count",
           "matched": "matched_count"
         },
-        "metric": {
-          "name": "accu"
-        },
-        "record": {
-          "name": "missRecords"
-        }
+        "out":[
+          {
+            "type": "record",
+            "name": "missRecords"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["LOG", "ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_accuracy-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-batch-sparksql.json 
b/measure/src/test/resources/_accuracy-batch-sparksql.json
deleted file mode 100644
index 2eef9f1..0000000
--- a/measure/src/test/resources/_accuracy-batch-sparksql.json
+++ /dev/null
@@ -1,63 +0,0 @@
-{
-  "name": "accu_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    }, {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_target.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "missRecords",
-        "rule": "SELECT source.* FROM source LEFT JOIN target ON 
coalesce(source.user_id, '') = coalesce(target.user_id, '') AND 
coalesce(source.first_name, '') = coalesce(target.first_name, '') AND 
coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT 
(source.user_id IS NULL AND source.first_name IS NULL AND source.post_code IS 
NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND 
target.post_code IS NULL)",
-        "record": {
-          "name": "miss"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "miss_count",
-        "rule": "SELECT count(*) as miss FROM `missRecords`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "total_count",
-        "rule": "SELECT count(*) as total FROM source"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "accu",
-        "rule": "SELECT `total_count`.`total` AS `total`, 
coalesce(`miss_count`.`miss`, 0) AS `miss`, (`total` - `miss`) AS `matched` 
FROM `total_count` FULL JOIN `miss_count`",
-        "metric": {
-          "name": "accu"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_accuracy-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json 
b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
index 240d768..4492a35 100644
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -11,6 +11,7 @@
         {
           "type": "kafka",
           "version": "0.8",
+          "dataframe.name": "this",
           "config": {
             "kafka.config": {
               "bootstrap.servers": "10.147.177.107:9092",
@@ -24,22 +25,20 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
+              "dsl.type": "df-ops",
+              "in.dataframe.name": "this",
+              "out.dataframe.name": "s1",
+              "rule": "from_json"
             },
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
+              "out.dataframe.name": "this",
+              "rule": "select name, age from s1"
             }
           ]
         }
       ],
-      "cache": {
+      "checkpoint": {
         "type": "parquet",
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
@@ -55,6 +54,7 @@
         {
           "type": "kafka",
           "version": "0.8",
+          "dataframe.name": "this",
           "config": {
             "kafka.config": {
               "bootstrap.servers": "10.147.177.107:9092",
@@ -68,22 +68,20 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
-              "name": "${t1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
+              "dsl.type": "df-ops",
+              "in.dataframe.name": "this",
+              "out.dataframe.name": "t1",
+              "rule": "from_json"
             },
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${t1}"
+              "out.dataframe.name": "this",
+              "rule": "select name, age from t1"
             }
           ]
         }
       ],
-      "cache": {
+      "checkpoint": {
         "type": "parquet",
         "file.path": "hdfs://localhost/griffin/streaming/dump/target",
         "info.path": "target",
@@ -100,7 +98,7 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "accuracy",
-        "name": "accu",
+        "out.dataframe.name": "accu",
         "rule": "source.name = target.name and source.age = target.age",
         "details": {
           "source": "source",
@@ -109,13 +107,19 @@
           "total": "total_count",
           "matched": "matched_count"
         },
-        "metric": {
-          "name": "accu"
-        },
-        "record": {
-          "name": "missRecords"
-        }
+        "out":[
+          {
+            "type":"metric",
+            "name": "accu"
+          },
+          {
+            "type":"record",
+            "name": "missRecords"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_accuracy-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-sparksql.json 
b/measure/src/test/resources/_accuracy-streaming-sparksql.json
deleted file mode 100644
index 0824cb8..0000000
--- a/measure/src/test/resources/_accuracy-streaming-sparksql.json
+++ /dev/null
@@ -1,142 +0,0 @@
-{
-  "name": "accu_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-2m", "0"]
-      }
-    }, {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${t1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${t1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
-        "info.path": "target",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-2m", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "missRecords",
-        "cache": true,
-        "rule": "SELECT source.* FROM source LEFT JOIN target ON 
coalesce(source.name, '') = coalesce(target.name, '') AND coalesce(source.age, 
'') = coalesce(target.age, '') WHERE (NOT (source.name IS NULL AND source.age 
IS NULL)) AND (target.name IS NULL AND target.age IS NULL)"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "miss_count",
-        "rule": "SELECT `__tmst`, count(*) as miss FROM `missRecords` GROUP BY 
`__tmst`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "total_count",
-        "rule": "SELECT `__tmst`, count(*) as total FROM source GROUP BY 
`__tmst`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "accu",
-        "rule": "SELECT `total_count`.`__tmst` AS `__tmst`, 
`total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss` 
FROM `total_count` FULL JOIN `miss_count` ON `total_count`.`__tmst` = 
`miss_count`.`__tmst`"
-      },
-      {
-        "dsl.type": "df-opr",
-        "name": "metric_accu",
-        "rule": "accuracy",
-        "details": {
-          "df.name": "accu",
-          "miss": "miss",
-          "total": "total",
-          "matched": "matched"
-        },
-        "metric": {
-          "name": "accuracy"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "accu_miss_records",
-        "rule": "SELECT `__tmst`, `__empty` FROM `metric_accu` WHERE 
`__record`",
-        "record": {
-          "name": "missRecords",
-          "data.source.cache": "source",
-          "origin.DF": "missRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_completeness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json 
b/measure/src/test/resources/_completeness-batch-griffindsl.json
index 9c00444..4f42092 100644
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -25,12 +25,17 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "completeness",
-        "name": "comp",
+        "out.dataframe.name": "comp",
         "rule": "email, post_code, first_name",
-        "metric": {
-          "name": "comp"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "comp"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json 
b/measure/src/test/resources/_completeness-streaming-griffindsl.json
index ba8bdce..02f0a39 100644
--- a/measure/src/test/resources/_completeness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -10,6 +10,7 @@
         {
           "type": "kafka",
           "version": "0.8",
+          "dataframe.name": "this",
           "config": {
             "kafka.config": {
               "bootstrap.servers": "10.147.177.107:9092",
@@ -23,22 +24,20 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
+              "dsl.type": "df-ops",
+              "in.dataframe.name": "this",
+              "out.dataframe.name": "s1",
+              "rule": "from_json"
             },
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
+              "out.dataframe.name": "this",
+              "rule": "select name, age from s1"
             }
           ]
         }
       ],
-      "cache": {
+      "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
@@ -54,12 +53,17 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "completeness",
-        "name": "comp",
+        "out.dataframe.name": "comp",
         "rule": "name, age",
-        "metric": {
-          "name": "comp"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "comp"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_distinctness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl.json
index af0c91e..d946089 100644
--- a/measure/src/test/resources/_distinctness-batch-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -37,7 +37,7 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "distinct",
-        "name": "dist",
+        "out.dataframe.name": "dist",
         "rule": "user_id",
         "details": {
           "source": "source",
@@ -48,10 +48,15 @@
           "num": "num",
           "duplication.array": "dup"
         },
-        "metric": {
-          "name": "distinct"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "distinct"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_distinctness-batch-griffindsl1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
deleted file mode 100644
index 4d94d8e..0000000
--- a/measure/src/test/resources/_distinctness-batch-griffindsl1.json
+++ /dev/null
@@ -1,73 +0,0 @@
-{
-  "name": "dist_batch",
-
-  "process.type": "batch",
-
-  "timestamp": 123456,
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/dupdata.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${this}"
-            }
-          ]
-        }
-      ]
-    },
-    {
-      "name": "target",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/dupdata.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select DISTINCT name, age from ${this}"
-            }
-          ]
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "distinct",
-        "name": "dist",
-        "rule": "name",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "total": "total",
-          "distinct": "distinct",
-          "dup": "dup",
-          "num": "num",
-          "duplication.array": "dup"
-        },
-        "metric": {
-          "name": "distinct"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_distinctness-batch-griffindsl2.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl2.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl2.json
deleted file mode 100644
index 6a12719..0000000
--- a/measure/src/test/resources/_distinctness-batch-griffindsl2.json
+++ /dev/null
@@ -1,74 +0,0 @@
-{
-  "name": "dist_batch",
-
-  "process.type": "batch",
-
-  "timestamp": 123456,
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/dupdata.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${this}"
-            }
-          ]
-        }
-      ]
-    },
-    {
-      "name": "target",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/dupdata.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select DISTINCT name, age from ${this}"
-            }
-          ]
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "distinct",
-        "name": "dist",
-        "rule": "name, [age]",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "total": "total",
-          "distinct": "distinct",
-          "dup": "dup",
-          "num": "num",
-          "duplication.array": "dup",
-          "record.enable": true
-        },
-        "metric": {
-          "name": "distinct"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_distinctness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json 
b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
index c36e7ba..e3629d1 100644
--- a/measure/src/test/resources/_distinctness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
@@ -6,7 +6,7 @@
   "data.sources": [
     {
       "name": "new",
-      "cache": {
+      "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/old",
         "info.path": "new",
         "ready.time.interval": "10s",
@@ -21,6 +21,7 @@
         {
           "type": "kafka",
           "version": "0.8",
+          "dataframe.name": "this",
           "config": {
             "kafka.config": {
               "bootstrap.servers": "10.149.247.156:9092",
@@ -34,22 +35,20 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
+              "dsl.type": "df-ops",
+              "in.dataframe.name": "this",
+              "out.dataframe.name": "s1",
+              "rule": "from_json"
             },
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
+              "out.dataframe.name": "this",
+              "rule": "select name, age from s1"
             }
           ]
         }
       ],
-      "cache": {
+      "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/old",
         "info.path": "old",
         "ready.time.interval": "10s",
@@ -64,7 +63,7 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "distinct",
-        "name": "dist",
+        "out.dataframe.name": "dist",
         "rule": "name, age",
         "details": {
           "source": "new",
@@ -76,10 +75,15 @@
           "num": "num",
           "duplication.array": "dup"
         },
-        "metric": {
-          "name": "distinct"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "distinct"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json 
b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
index 03b0405..70cc369 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
@@ -26,23 +26,31 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "name": "prof",
+        "out.dataframe.name": "prof",
         "rule": "name, count(*) as cnt from source group by name",
-        "metric": {
-          "name": "name_group",
-          "collect.type": "array"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "name_group",
+            "flatten": "array"
+          }
+        ]
       },
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "name": "grp",
+        "out.dataframe.name": "grp",
         "rule": "age, count(*) as cnt from source group by age order by cnt",
-        "metric": {
-          "name": "age_group",
-          "collect.type": "array"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "age_group",
+            "flatten": "array"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json 
b/measure/src/test/resources/_profiling-batch-griffindsl.json
index ec082c4..273d2e4 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -12,14 +12,14 @@
         {
           "type": "avro",
           "version": "1.7",
+          "dataframe.name" : "this_table",
           "config": {
             "file.name": "src/test/resources/users_info_src.avro"
           },
           "pre.proc": [
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select reg_replace(email, 
'^([^@0-9]+)([0-9]+)@(dc)(?:\\\\.[^@]+)$', '$1@$3') as email, post_code from 
${this}"
+              "rule": "select * from this_table where user_id < 10014"
             }
           ]
         }
@@ -32,23 +32,31 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "name": "prof",
-        "rule": "email, count(*) as cnt from source group by email",
-        "metric": {
-          "name": "prof",
-          "collect.type": "array"
-        }
+        "out.dataframe.name": "prof",
+        "rule": "user_id, count(*) as cnt from source group by user_id",
+        "out":[
+          {
+            "type": "metric",
+            "name": "prof",
+            "flatten": "array"
+          }
+        ]
       },
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "name": "grp",
+        "out.dataframe.name": "grp",
         "rule": "source.post_code, count(*) as cnt from source group by 
source.post_code order by cnt desc",
-        "metric": {
-          "name": "post_group",
-          "collect.type": "array"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "post_group",
+            "flatten": "array"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json 
b/measure/src/test/resources/_profiling-batch-sparksql.json
index fdfd812..c8077d0 100644
--- a/measure/src/test/resources/_profiling-batch-sparksql.json
+++ b/measure/src/test/resources/_profiling-batch-sparksql.json
@@ -24,21 +24,29 @@
     "rules": [
       {
         "dsl.type": "spark-sql",
-        "name": "prof",
+        "out.dataframe.name": "prof",
         "rule": "select count(*) as `cnt`, count(distinct `post_code`) as 
`dis-cnt`, max(user_id) as `max` from source",
-        "metric": {
-          "name": "prof"
-        }
+        "out": [
+          {
+            "type": "metric",
+            "name": "prof"
+          }
+        ]
       },
       {
         "dsl.type": "spark-sql",
-        "name": "grp",
+        "out.dataframe.name": "grp",
         "rule": "select post_code as `pc`, count(*) as `cnt` from source group 
by post_code",
-        "metric": {
-          "name": "post_group",
-          "collect.type": "array"
-        }
+        "out": [
+          {
+            "type": "metric",
+            "name": "post_group",
+            "flatten": "array"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json 
b/measure/src/test/resources/_profiling-streaming-griffindsl.json
index b6feb5a..a523434 100644
--- a/measure/src/test/resources/_profiling-streaming-griffindsl.json
+++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json
@@ -10,6 +10,7 @@
         {
           "type": "kafka",
           "version": "0.8",
+          "dataframe.name": "this",
           "config": {
             "kafka.config": {
               "bootstrap.servers": "10.147.177.107:9092",
@@ -23,22 +24,20 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
+              "dsl.type": "df-ops",
+              "in.dataframe.name": "this",
+              "out.dataframe.name": "s1",
+              "rule": "from_json"
             },
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
+              "out.dataframe.name": "this",
+              "rule": "select name, age from s1"
             }
           ]
         }
       ],
-      "cache": {
+      "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
@@ -54,22 +53,30 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "name": "prof",
+        "out.dataframe.name": "prof",
         "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as 
`min` from source",
-        "metric": {
-          "name": "prof"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "prof"
+          }
+        ]
       },
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "name": "grp",
+        "out.dataframe.name": "grp",
         "rule": "select name, count(*) as `cnt` from source group by name",
-        "metric": {
-          "name": "name_group",
-          "collect.type": "array"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "name_group",
+            "flatten": "array"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_profiling-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-sparksql.json 
b/measure/src/test/resources/_profiling-streaming-sparksql.json
deleted file mode 100644
index 4f0b0ee..0000000
--- a/measure/src/test/resources/_profiling-streaming-sparksql.json
+++ /dev/null
@@ -1,80 +0,0 @@
-{
-  "name": "prof_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "prof",
-        "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as 
`min` from source",
-        "metric": {
-          "name": "prof"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "grp",
-        "rule": "select name, count(*) as `cnt` from source group by name",
-        "metric": {
-          "name": "name_group",
-          "collect.type": "array"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "tmst_grp",
-        "rule": "select `__tmst`, count(*) as `cnt` from source group by 
`__tmst`",
-        "metric": {
-          "name": "tmst_group"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_timeliness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json 
b/measure/src/test/resources/_timeliness-batch-griffindsl.json
index 90439df..f3759ca 100644
--- a/measure/src/test/resources/_timeliness-batch-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json
@@ -23,7 +23,7 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "timeliness",
-        "name": "timeliness",
+        "out.dataframe.name": "timeliness",
         "rule": "ts, end_ts",
         "details": {
           "source": "source",
@@ -37,13 +37,19 @@
           "percentile": "percentile",
           "percentile.values": [0.95]
         },
-        "metric": {
-          "name": "timeliness"
-        },
-        "record": {
-          "name": "lateRecords"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "timeliness"
+          },
+          {
+            "type": "record",
+            "name": "lateRecords"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_timeliness-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-sparksql.json 
b/measure/src/test/resources/_timeliness-batch-sparksql.json
deleted file mode 100644
index f9cb368..0000000
--- a/measure/src/test/resources/_timeliness-batch-sparksql.json
+++ /dev/null
@@ -1,52 +0,0 @@
-{
-  "name": "timeliness_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/timeliness_data.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "in_time",
-        "rule": "select *, (ts) as `_in_ts`, (end_ts) as `_out_ts` from source 
where (ts) IS NOT NULL"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "lat",
-        "cache": true,
-        "rule": "select *, (`_out_ts` - `_in_ts`) as `latency` from `in_time`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "metric",
-        "rule": "select cast(avg(`latency`) as bigint) as `avg`, 
max(`latency`) as `max`, min(`latency`) as `min` from `lat`",
-        "metric": {
-          "name": "timeliness"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "slows",
-        "rule": "select * from `lat` where `latency` > 60000",
-        "record": {
-          "name": "lateRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_timeliness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json 
b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
index 5916e5c..1663122 100644
--- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
@@ -10,6 +10,7 @@
         {
           "type": "kafka",
           "version": "0.8",
+          "dataframe.name": "this",
           "config": {
             "kafka.config": {
               "bootstrap.servers": "10.149.247.156:9092",
@@ -23,22 +24,20 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
+              "dsl.type": "df-ops",
+              "in.dataframe.name": "this",
+              "out.dataframe.name": "s1",
+              "rule": "from_json"
             },
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select ts, end_ts, name, age from ${s1}"
+              "out.dataframe.name": "this",
+              "rule": "select name, age from s1"
             }
           ]
         }
       ],
-      "cache": {
+      "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
@@ -53,7 +52,7 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "timeliness",
-        "name": "timeliness",
+        "out.dataframe.name": "timeliness",
         "rule": "ts, end_ts",
         "details": {
           "source": "source",
@@ -67,13 +66,19 @@
           "percentile": "percentile",
           "percentile.values": [0.2, 0.5, 0.8]
         },
-        "metric": {
-          "name": "timeliness"
-        },
-        "record": {
-          "name": "lateRecords"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "timeliness"
+          },
+          {
+            "type": "record",
+            "name": "lateRecords"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_timeliness-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-sparksql.json 
b/measure/src/test/resources/_timeliness-streaming-sparksql.json
deleted file mode 100644
index dc736ab..0000000
--- a/measure/src/test/resources/_timeliness-streaming-sparksql.json
+++ /dev/null
@@ -1,82 +0,0 @@
-{
-  "name": "timeliness_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "fff",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select ts, name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "in_time",
-        "rule": "select *, (ts) as `_in_ts` from source where (ts) IS NOT NULL"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "lat",
-        "cache": true,
-        "rule": "select *, (`__tmst` - `_in_ts`) as `latency` from `in_time`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "metric",
-        "rule": "select `__tmst`, cast(avg(`latency`) as bigint) as `avg`, 
max(`latency`) as `max`, min(`latency`) as `min` from `lat`",
-        "metric": {
-          "name": "timeliness"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "slows",
-        "rule": "select * from `lat` where `latency` > 60000",
-        "record": {
-          "name": "lateRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_uniqueness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json 
b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
index 28009e8..2c32930 100644
--- a/measure/src/test/resources/_uniqueness-batch-griffindsl.json
+++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
@@ -36,7 +36,7 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "uniqueness",
-        "name": "dup",
+        "out.dataframe.name": "dup",
         "rule": "user_id",
         "details": {
           "source": "source",
@@ -46,13 +46,19 @@
           "dup": "dup",
           "num": "num"
         },
-        "metric": {
-          "name": "unique"
-        },
-        "record": {
-          "name": "dupRecords"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "unique"
+          },
+          {
+            "type": "record",
+            "name": "dupRecords"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json 
b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
index bc5cbd2..a4f4dcc 100644
--- a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
@@ -11,6 +11,7 @@
         {
           "type": "kafka",
           "version": "0.8",
+          "dataframe.name": "this",
           "config": {
             "kafka.config": {
               "bootstrap.servers": "10.149.247.156:9092",
@@ -24,22 +25,20 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
+              "dsl.type": "df-ops",
+              "in.dataframe.name": "this",
+              "out.dataframe.name": "s1",
+              "rule": "from_json"
             },
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
+              "out.dataframe.name": "this",
+              "rule": "select name, age from s1"
             }
           ]
         }
       ],
-      "cache": {
+      "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/new",
         "info.path": "new",
         "ready.time.interval": "10s",
@@ -53,6 +52,7 @@
         {
           "type": "kafka",
           "version": "0.8",
+          "dataframe.name": "this",
           "config": {
             "kafka.config": {
               "bootstrap.servers": "10.149.247.156:9092",
@@ -66,22 +66,20 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
+              "dsl.type": "df-ops",
+              "in.dataframe.name": "this",
+              "out.dataframe.name": "s1",
+              "rule": "from_json"
             },
             {
               "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
+              "out.dataframe.name": "this",
+              "rule": "select name, age from s1"
             }
           ]
         }
       ],
-      "cache": {
+      "checkpoint": {
         "file.path": "hdfs://localhost/griffin/streaming/dump/old",
         "info.path": "old",
         "ready.time.interval": "10s",
@@ -96,7 +94,7 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "uniqueness",
-        "name": "dup",
+        "out.dataframe.name": "dup",
         "rule": "name, age",
         "details": {
           "source": "new",
@@ -107,13 +105,19 @@
           "num": "num",
           "duplication.array": "dup"
         },
-        "metric": {
-          "name": "unique"
-        },
-        "record": {
-          "name": "dupRecords"
-        }
+        "out":[
+          {
+            "type": "metric",
+            "name": "unique"
+          },
+          {
+            "type": "record",
+            "name": "dupRecords"
+          }
+        ]
       }
     ]
-  }
+  },
+
+  "sinks": ["CONSOLE","ELASTICSEARCH"]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/_uniqueness-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-streaming-sparksql.json 
b/measure/src/test/resources/_uniqueness-streaming-sparksql.json
deleted file mode 100644
index 7d13215..0000000
--- a/measure/src/test/resources/_uniqueness-streaming-sparksql.json
+++ /dev/null
@@ -1,130 +0,0 @@
-{
-  "name": "unique_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "new",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "new",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/new",
-        "info.path": "new",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"]
-      }
-    },
-    {
-      "name": "old",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "old",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
-        "info.path": "old",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-24h", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "dist",
-        "rule": "SELECT DISTINCT * FROM new"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "joined",
-        "rule": "SELECT dist.* FROM old RIGHT JOIN dist ON coalesce(old.name, 
'') = coalesce(dist.name, '') AND coalesce(old.age, '') = coalesce(dist.age, 
'')"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "grouped",
-        "rule": "SELECT `__tmst`, `name`, `age`, count(*) as `dup_cnt` FROM 
joined GROUP BY `__tmst`, `name`, `age`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "dupRecs",
-        "cache": true,
-        "rule": "SELECT * FROM grouped WHERE `dup_cnt` > 1",
-        "record": {
-          "name": "dupRecords"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "dupMetric",
-        "rule": "SELECT `__tmst`, `dup_cnt`, count(*) as `item_cnt` FROM 
dupRecs GROUP BY `__tmst`, `dup_cnt`",
-        "metric": {
-          "name": "dup"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/env-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-batch.json 
b/measure/src/test/resources/env-batch.json
index 0d6ea8a..3e1f7a6 100644
--- a/measure/src/test/resources/env-batch.json
+++ b/measure/src/test/resources/env-batch.json
@@ -6,24 +6,14 @@
     }
   },
 
-  "persist": [
+  "sinks": [
     {
-      "type": "log",
+      "type": "console",
       "config": {
         "max.log.lines": 10
       }
-    },
-    {
-      "type": "hdfs",
-      "config": {
-        "path": "hdfs://localhost/griffin/batch/persist",
-        "max.persist.lines": 10000,
-        "max.lines.per.file": 10000
-      }
     }
   ],
 
-  "info.cache": [],
-
-  "cleaner": {}
+  "griffin.checkpoint": []
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/env-streaming-mongo.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-streaming-mongo.json 
b/measure/src/test/resources/env-streaming-mongo.json
index 0d50462..ef10aef 100644
--- a/measure/src/test/resources/env-streaming-mongo.json
+++ b/measure/src/test/resources/env-streaming-mongo.json
@@ -17,9 +17,9 @@
     }
   },
 
-  "persist": [
+  "sinks": [
     {
-      "type": "log",
+      "type": "console",
       "config": {
         "max.log.lines": 100
       }
@@ -34,7 +34,7 @@
     }
   ],
 
-  "info.cache": [
+  "griffin.checkpoint": [
     {
       "type": "zk",
       "config": {
@@ -46,9 +46,5 @@
         "close.clear": false
       }
     }
-  ],
-
-  "cleaner": {
-    "clean.interval": "2m"
-  }
+  ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-streaming.json 
b/measure/src/test/resources/env-streaming.json
index 08dd7ee..34d469f 100644
--- a/measure/src/test/resources/env-streaming.json
+++ b/measure/src/test/resources/env-streaming.json
@@ -18,16 +18,16 @@
     }
   },
 
-  "persist": [
+  "sinks": [
     {
-      "type": "log",
+      "type": "console",
       "config": {
         "max.log.lines": 100
       }
     }
   ],
 
-  "info.cache": [
+  "griffin.checkpoint": [
     {
       "type": "zk",
       "config": {
@@ -39,9 +39,5 @@
         "close.clear": false
       }
     }
-  ],
-
-  "cleaner": {
-    "clean.interval": "2m"
-  }
+  ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index f5b404e..bb75cec 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -28,12 +28,12 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
 
 
   "params " should "be parsed from a valid file" in {
-    val reader :ParamReader = 
ParamFileReader(getClass.getResource("/_accuracy-batch-sparksql.json").getFile)
+    val reader :ParamReader = 
ParamFileReader(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile)
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(v) =>
-        v.evaluateRule.getRules(0).dslType should === ("spark-sql")
-        v.evaluateRule.getRules(0).name should === ("missRecords")
+        v.evaluateRule.getRules(0).dslType should === ("griffin-dsl")
+        v.evaluateRule.getRules(0).outDfName should === ("accu")
       case Failure(_) =>
         fail("it should not happen")
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 0f08cdc..1e9f3b0 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -28,7 +28,7 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{
 
 
   "params " should "be parsed from a valid file" in {
-    val bufferedSource = 
Source.fromFile(getClass.getResource("/_accuracy-batch-sparksql.json").getFile)
+    val bufferedSource = 
Source.fromFile(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile)
     val jsonString = bufferedSource.getLines().mkString
     bufferedSource.close
 
@@ -36,8 +36,8 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(v) =>
-        v.evaluateRule.getRules(0).dslType should === ("spark-sql")
-        v.evaluateRule.getRules(0).name should === ("missRecords")
+        v.evaluateRule.getRules(0).dslType should === ("griffin-dsl")
+        v.evaluateRule.getRules(0).outDfName should === ("accu")
       case Failure(_) =>
         fail("it should not happen")
     }

Reply via email to