http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LogicalExpr.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LogicalExpr.scala
new file mode 100644
index 0000000..dd061d7
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LogicalExpr.scala
@@ -0,0 +1,178 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+import org.apache.griffin.measure.rule.CalculationUtil._
+import org.apache.spark.sql.types.{BooleanType, DataType}
+
+trait LogicalExpr extends Expr with AnalyzableExpr {
+  override def cacheUnit: Boolean = true
+}
+
+case class LogicalSimpleExpr(expr: MathExpr) extends LogicalExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = 
expr.calculate(values)
+  val desc: String = expr.desc
+  val dataSources: Set[String] = expr.dataSources
+  override def cacheUnit: Boolean = false
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = 
expr.getCacheExprs(ds)
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = 
expr.getFinalCacheExprs(ds)
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = 
expr.getPersistExprs(ds)
+}
+
+case class LogicalCompareExpr(left: MathExpr, compare: String, right: 
MathExpr) extends LogicalExpr {
+  private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, 
"""!==?""".r, ">", ">=", "<", "<=")
+  def calculateOnly(values: Map[String, Any]): Option[Boolean] = {
+    val (lv, rv) = (left.calculate(values), right.calculate(values))
+    compare match {
+      case this.eqOpr() => lv === rv
+      case this.neqOpr() => lv =!= rv
+      case this.btOpr => lv > rv
+      case this.bteOpr => lv >= rv
+      case this.ltOpr => lv < rv
+      case this.lteOpr => lv <= rv
+      case _ => None
+    }
+  }
+  val desc: String = s"${left.desc} ${compare} ${right.desc}"
+  val dataSources: Set[String] = left.dataSources ++ right.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    left.getCacheExprs(ds) ++ right.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    left.getFinalCacheExprs(ds) ++ right.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    left.getPersistExprs(ds) ++ right.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = {
+    if (compare == "=" || compare == "==") {
+      (left.dataSourceOpt, right.dataSourceOpt) match {
+        case (Some(dsPair._1), Some(dsPair._2)) => (left, right) :: Nil
+        case (Some(dsPair._2), Some(dsPair._1)) => (right, left) :: Nil
+        case _ => Nil
+      }
+    } else Nil
+  }
+}
+
+case class LogicalRangeExpr(left: MathExpr, rangeOpr: String, range: 
RangeDesc) extends LogicalExpr {
+  private val (inOpr, ninOpr, btwnOpr, nbtwnOpr) = ("""(?i)in""".r, 
"""(?i)not\s+in""".r, """(?i)between""".r, """(?i)not\s+between""".r)
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val (lv, rvs) = (left.calculate(values), 
range.elements.map(_.calculate(values)))
+    rangeOpr match {
+      case this.inOpr() => lv in rvs
+      case this.ninOpr() => lv not_in rvs
+      case this.btwnOpr() => lv between rvs
+      case this.nbtwnOpr() => lv not_between rvs
+      case _ => None
+    }
+  }
+  val desc: String = s"${left.desc} ${rangeOpr} ${range.desc}"
+  val dataSources: Set[String] = left.dataSources ++ 
range.elements.flatMap(_.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    left.getCacheExprs(ds) ++ range.elements.flatMap(_.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    left.getFinalCacheExprs(ds) ++ 
range.elements.flatMap(_.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    left.getPersistExprs(ds) ++ range.elements.flatMap(_.getPersistExprs(ds))
+  }
+}
+
+// -- logical statement --
+//case class LogicalFactorExpr(self: LogicalExpr) extends LogicalExpr {
+//  def calculate(values: Map[String, Any]): Option[Any] = 
self.calculate(values)
+//  val desc: String = self.desc
+//}
+
+case class UnaryLogicalExpr(oprList: Iterable[String], factor: LogicalExpr) 
extends LogicalExpr {
+  private val notOpr = """(?i)not|!""".r
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = factor.calculate(values)
+    oprList.foldRight(fv) { (opr, v) =>
+      opr match {
+        case this.notOpr() => !v
+        case _ => None
+      }
+    }
+  }
+  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev} 
${ex}" }
+  val dataSources: Set[String] = factor.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    factor.getPersistExprs(ds)
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = {
+    val notOprList = oprList.filter { opr =>
+      opr match {
+        case this.notOpr() => true
+        case _ => false
+      }
+    }
+    if (notOprList.size % 2 == 0) factor.getGroupbyExprPairs(dsPair) else Nil
+  }
+}
+
+case class BinaryLogicalExpr(first: LogicalExpr, others: Iterable[(String, 
LogicalExpr)]) extends LogicalExpr {
+  private val (andOpr, orOpr) = ("""(?i)and|&&""".r, """(?i)or|\|\|""".r)
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = first.calculate(values)
+    others.foldLeft(fv) { (v, pair) =>
+      val (opr, next) = pair
+      val nv = next.calculate(values)
+      opr match {
+        case this.andOpr() => v && nv
+        case this.orOpr() => v || nv
+        case _ => None
+      }
+    }
+  }
+  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} 
${next._1} ${next._2.desc}" }
+  val dataSources: Set[String] = first.dataSources ++ 
others.flatMap(_._2.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
+  }
+
+  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = {
+    if (others.isEmpty) first.getGroupbyExprPairs(dsPair)
+    else {
+      val isAnd = others.exists(_._1 match {
+        case this.andOpr() => true
+        case _ => false
+      })
+      if (isAnd) {
+        first.getGroupbyExprPairs(dsPair) ++ 
others.flatMap(_._2.getGroupbyExprPairs(dsPair))
+      } else Nil
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
new file mode 100644
index 0000000..661e8f4
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+import org.apache.griffin.measure.rule.CalculationUtil._
+import org.apache.griffin.measure.rule.DataTypeCalculationUtil._
+import org.apache.spark.sql.types.DataType
+
+trait MathExpr extends Expr {
+
+}
+
+case class MathFactorExpr(self: Expr) extends MathExpr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = 
self.calculate(values)
+  val desc: String = self.desc
+  val dataSources: Set[String] = self.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    self.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    self.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    self.getPersistExprs(ds)
+  }
+}
+
+case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends 
MathExpr {
+  private val (posOpr, negOpr) = ("+", "-")
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = factor.calculate(values)
+    oprList.foldRight(fv) { (opr, v) =>
+      opr match {
+        case this.posOpr => v
+        case this.negOpr => -v
+        case _ => None
+      }
+    }
+  }
+  val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => 
s"${prev}${ex}" }
+  val dataSources: Set[String] = factor.dataSources
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getCacheExprs(ds)
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    factor.getFinalCacheExprs(ds)
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    factor.getPersistExprs(ds)
+  }
+}
+
+case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, 
MathExpr)]) extends MathExpr {
+  private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", 
"%")
+  def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val fv = first.calculate(values)
+    others.foldLeft(fv) { (v, pair) =>
+      val (opr, next) = pair
+      val nv = next.calculate(values)
+      opr match {
+        case this.addOpr => v + nv
+        case this.subOpr => v - nv
+        case this.mulOpr => v * nv
+        case this.divOpr => v / nv
+        case this.modOpr => v % nv
+        case _ => None
+      }
+    }
+  }
+  val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} 
${next._1} ${next._2.desc}" }
+  val dataSources: Set[String] = first.dataSources ++ 
others.flatMap(_._2.dataSources).toSet
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
+  }
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
new file mode 100644
index 0000000..5b7f1b0
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.expr
+
+import org.apache.spark.sql.types.DataType
+import org.apache.griffin.measure.rule.CalculationUtil._
+
+trait SelectExpr extends Expr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = None
+}
+
+case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends 
SelectExpr {
+  val desc: String = s"[${fields.map(_.desc).mkString(", ")}]"
+  val dataSources: Set[String] = Set.empty[String]
+}
+
+case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) 
extends SelectExpr {
+  val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})"
+  val dataSources: Set[String] = args.flatMap(_.dataSources).toSet
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getCacheExprs(ds))
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getFinalCacheExprs(ds))
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getPersistExprs(ds))
+}
+
+case class FilterSelectExpr(field: FieldDesc, compare: String, value: 
MathExpr) extends SelectExpr {
+  val desc: String = s"[${field.desc} ${compare} ${value.desc}]"
+  val dataSources: Set[String] = value.dataSources
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = 
value.getCacheExprs(ds)
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = 
value.getFinalCacheExprs(ds)
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = 
value.getPersistExprs(ds)
+  private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, 
"""!==?""".r, ">", ">=", "<", "<=")
+  override def calculateOnly(values: Map[String, Any]): Option[Any] = {
+    val (lv, rv) = (values.get(fieldKey), value.calculate(values))
+    compare match {
+      case this.eqOpr() => lv === rv
+      case this.neqOpr() => lv =!= rv
+      case this.btOpr => lv > rv
+      case this.bteOpr => lv >= rv
+      case this.ltOpr => lv < rv
+      case this.lteOpr => lv <= rv
+      case _ => None
+    }
+  }
+  def fieldKey: String = s"__${field.field}"
+}
+
+// -- selection --
+case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) 
extends Expr {
+  def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id)
+
+  val desc: String = {
+    val argsString = selectors.map(_.desc).mkString("")
+    s"${head.desc}${argsString}"
+  }
+  val dataSources: Set[String] = {
+    val selectorDataSources = selectors.flatMap(_.dataSources).toSet
+    selectorDataSources + head.head
+  }
+
+  override def cacheUnit: Boolean = true
+  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getCacheExprs(ds))
+  }
+  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getFinalCacheExprs(ds))
+  }
+
+  override def persistUnit: Boolean = true
+  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
+    selectors.flatMap(_.getPersistExprs(ds))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
new file mode 100644
index 0000000..15161c3
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.func
+
+import org.apache.griffin.measure.utils.JsonUtil
+
+class DefaultFunctionDefine extends FunctionDefine {
+
+  def json(strOpt: Option[_]): Map[String, Any] = {
+    try {
+      strOpt match {
+        case Some(str: String) => JsonUtil.toAnyMap(str)
+        case _ => throw new Exception("json function param should be string")
+      }
+    } catch {
+      case e: Throwable => throw e
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
new file mode 100644
index 0000000..d23fc7a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.func
+
+trait FunctionDefine extends Serializable {
+
+}
+
+class UnKnown {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
new file mode 100644
index 0000000..57e934d
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
@@ -0,0 +1,75 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.func
+
+import java.lang.reflect.Method
+
+import org.apache.griffin.measure.log.Loggable
+
+import scala.collection.mutable.{Map => MutableMap}
+
+object FunctionUtil extends Loggable {
+
+  val functionDefines: MutableMap[String, FunctionDefine] = MutableMap[String, 
FunctionDefine]()
+
+  
registerFunctionDefine(Array(classOf[DefaultFunctionDefine].getCanonicalName))
+
+  def registerFunctionDefine(classes: Iterable[String]): Unit = {
+    for (cls <- classes) {
+      try {
+        val clz: Class[_] = Class.forName(cls)
+        if (classOf[FunctionDefine].isAssignableFrom(clz)) {
+          functionDefines += (cls -> 
clz.newInstance.asInstanceOf[FunctionDefine])
+        } else {
+          warn(s"${cls} register fails: ${cls} is not sub class of 
${classOf[FunctionDefine].getCanonicalName}")
+        }
+      } catch {
+        case e: Throwable => warn(s"${cls} register fails: ${e.getMessage}")
+      }
+    }
+  }
+
+  def invoke(methodName: String, params: Array[Option[Any]]): Seq[Option[Any]] 
= {
+//    val paramTypes = params.map { param =>
+//      try {
+//        param match {
+//          case Some(v) => v.getClass
+//          case _ => classOf[UnKnown]
+//        }
+//      } catch {
+//        case e: Throwable => classOf[UnKnown]
+//      }
+//    }
+    val paramTypes = params.map(a => classOf[Option[_]])
+
+    functionDefines.values.foldLeft(Nil: Seq[Option[Any]]) { (res, funcDef) =>
+      if (res.isEmpty) {
+        val clz = funcDef.getClass
+        try {
+          val method = clz.getMethod(methodName, paramTypes: _*)
+          Seq(Some(method.invoke(funcDef, params: _*)))
+        } catch {
+          case e: Throwable => res
+        }
+      } else res
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
new file mode 100644
index 0000000..8a608ff
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import org.apache.spark.rdd.RDD
+
+object HdfsFileDumpUtil {
+
+  val sepCount = 5000
+
+  private def suffix(i: Long): String = {
+    if (i == 0) "" else s".${i}"
+  }
+  private def samePattern(fileName: String, patternFileName: String): Boolean 
= {
+    fileName.startsWith(patternFileName)
+  }
+
+  def splitRdd[T](rdd: RDD[T])(implicit m: Manifest[T]): RDD[(Long, 
Iterable[T])] = {
+    val indexRdd = rdd.zipWithIndex
+    indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey()
+  }
+  def splitIterable[T](datas: Iterable[T])(implicit m: Manifest[T]): 
Iterator[(Int, Iterable[T])] = {
+    val groupedData = datas.grouped(sepCount).zipWithIndex
+    groupedData.map(v => (v._2, v._1))
+  }
+
+  private def directDump(path: String, list: Iterable[String], lineSep: 
String): Unit = {
+    // collect and save
+    val strRecords = list.mkString(lineSep)
+    // save into hdfs
+    HdfsUtil.writeContent(path, strRecords)
+  }
+
+  def dump(path: String, recordsRdd: RDD[String], lineSep: String): Boolean = {
+    val groupedRdd = splitRdd(recordsRdd)
+    groupedRdd.aggregate(true)({ (res, pair) =>
+      val (idx, list) = pair
+      val filePath = path + suffix(idx)
+      directDump(filePath, list, lineSep)
+      true
+    }, _ && _)
+  }
+  def dump(path: String, records: Iterable[String], lineSep: String): Boolean 
= {
+    val groupedRecords = splitIterable(records)
+    groupedRecords.aggregate(true)({ (res, pair) =>
+      val (idx, list) = pair
+      val filePath = path + suffix(idx)
+      directDump(filePath, list, lineSep)
+      true
+    }, _ && _)
+  }
+
+  def remove(path: String, filename: String, withSuffix: Boolean): Unit = {
+    if (withSuffix) {
+      val files = HdfsUtil.listSubPaths(path, "file")
+      val patternFiles = files.filter(samePattern(_, filename))
+      patternFiles.foreach { f =>
+        val rmPath = HdfsUtil.getHdfsFilePath(path, f)
+        HdfsUtil.deleteHdfsPath(rmPath)
+      }
+    } else {
+      val rmPath = HdfsUtil.getHdfsFilePath(path, filename)
+      HdfsUtil.deleteHdfsPath(rmPath)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
new file mode 100644
index 0000000..6dd54b7
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -0,0 +1,120 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, 
FileSystem, Path}
+
+object HdfsUtil {
+
+  private val seprator = "/"
+
+  private val conf = new Configuration()
+  conf.set("dfs.support.append", "true")
+//  conf.set("fs.defaultFS", "hdfs://localhost")    // debug @localhost
+
+  private val dfs = FileSystem.get(conf)
+
+  def existPath(filePath: String): Boolean = {
+    val path = new Path(filePath)
+    dfs.exists(path)
+  }
+
+  def createFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+    return dfs.create(path)
+  }
+
+  def appendOrCreateFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
+  }
+
+  def openFile(filePath: String): FSDataInputStream = {
+    val path = new Path(filePath)
+    dfs.open(path)
+  }
+
+  def writeContent(filePath: String, message: String): Unit = {
+    val out = createFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def appendContent(filePath: String, message: String): Unit = {
+    val out = appendOrCreateFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def createEmptyFile(filePath: String): Unit = {
+    val out = createFile(filePath)
+    out.close
+  }
+
+
+  def getHdfsFilePath(parentPath: String, fileName: String): String = {
+    if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + 
seprator + fileName
+  }
+
+  def deleteHdfsPath(dirPath: String): Unit = {
+    val path = new Path(dirPath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+  }
+
+//  def listPathFiles(dirPath: String): Iterable[String] = {
+//    val path = new Path(dirPath)
+//    try {
+//      val fileStatusArray = dfs.listStatus(path)
+//      fileStatusArray.flatMap { fileStatus =>
+//        if (fileStatus.isFile) {
+//          Some(fileStatus.getPath.getName)
+//        } else None
+//      }
+//    } catch {
+//      case e: Throwable => {
+//        println(s"list path files error: ${e.getMessage}")
+//        Nil
+//      }
+//    }
+//  }
+
+  def listSubPaths(dirPath: String, subType: String, fullPath: Boolean = 
false): Iterable[String] = {
+    val path = new Path(dirPath)
+    try {
+      val fileStatusArray = dfs.listStatus(path)
+      fileStatusArray.filter { fileStatus =>
+        subType match {
+          case "dir" => fileStatus.isDirectory
+          case "file" => fileStatus.isFile
+          case _ => true
+        }
+      }.map { fileStatus =>
+        val fname = fileStatus.getPath.getName
+        if (fullPath) getHdfsFilePath(dirPath, fname) else fname
+      }
+    } catch {
+      case e: Throwable => {
+        println(s"list path files error: ${e.getMessage}")
+        Nil
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
new file mode 100644
index 0000000..e016b60
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import scalaj.http._
+
+object HttpUtil {
+
+  val GET_REGEX = """^(?i)get$""".r
+  val POST_REGEX = """^(?i)post$""".r
+  val PUT_REGEX = """^(?i)put$""".r
+  val DELETE_REGEX = """^(?i)delete$""".r
+
+  def postData(url: String, params: Map[String, Object], headers: Map[String, 
Object], data: String): Boolean = {
+    val response = 
Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
+    response.isSuccess
+  }
+
+  def httpRequest(url: String, method: String, params: Map[String, Object], 
headers: Map[String, Object], data: String): Boolean = {
+    val httpReq = 
Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
+    method match {
+      case POST_REGEX() => {
+        val res = httpReq.postData(data).asString
+        res.isSuccess
+      }
+      case PUT_REGEX() => {
+        val res = httpReq.put(data).asString
+        res.isSuccess
+      }
+      case _ => false
+    }
+  }
+
+  private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, 
String] = {
+    map.map(pair => pair._1 -> pair._2.toString)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
new file mode 100644
index 0000000..1418375
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import java.io.InputStream
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import scala.reflect._
+
+object JsonUtil {
+  val mapper = new ObjectMapper()
+  mapper.registerModule(DefaultScalaModule)
+  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+
+  def toJson(value: Map[Symbol, Any]): String = {
+    toJson(value map { case (k,v) => k.name -> v})
+  }
+
+  def toJson(value: Any): String = {
+    mapper.writeValueAsString(value)
+  }
+
+  def toMap[V](json:String)(implicit m: Manifest[V]) = 
fromJson[Map[String,V]](json)
+
+  def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = {
+    mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+  }
+
+  def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = {
+    mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]])
+  }
+
+  def toAnyMap(json: String): Map[String, Any] = {
+    mapper.readValue(json, classOf[Map[String, Any]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
new file mode 100644
index 0000000..0079d10
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import scala.util.{Failure, Success, Try}
+
+object TimeUtil {
+
+  final val TimeRegex = """([+\-]?\d+)(d|h|m|s|ms)""".r
+  final val PureTimeRegex = """([+\-]?\d+)""".r
+
+  def milliseconds(timeString: String): Option[Long] = {
+    val value: Option[Long] = {
+      Try {
+        timeString match {
+          case TimeRegex(time, unit) => {
+            val t = time.toLong
+            unit match {
+              case "d" => t * 24 * 60 * 60 * 1000
+              case "h" => t * 60 * 60 * 1000
+              case "m" => t * 60 * 1000
+              case "s" => t * 1000
+              case "ms" => t
+              case _ => throw new Exception(s"${timeString} is invalid time 
format")
+            }
+          }
+          case PureTimeRegex(time) => {
+            val t = time.toLong
+            t
+          }
+          case _ => throw new Exception(s"${timeString} is invalid time 
format")
+        }
+      } match {
+        case Success(v) => Some(v)
+        case Failure(ex) => throw ex
+      }
+    }
+    value
+  }
+
+  def timeToUnit(ms: Long, unit: String): Long = {
+    unit match {
+      case "ms" => ms
+      case "sec" => ms / 1000
+      case "min" => ms / (60 * 1000)
+      case "hour" => ms / (60 * 60 * 1000)
+      case "day" => ms / (24 * 60 * 60 * 1000)
+      case _ => ms / (60 * 1000)
+    }
+  }
+
+  def timeFromUnit(t: Long, unit: String): Long = {
+    unit match {
+      case "ms" => t
+      case "sec" => t * 1000
+      case "min" => t * 60 * 1000
+      case "hour" => t * 60 * 60 * 1000
+      case "day" => t * 24 * 60 * 60 * 1000
+      case _ => t * 60 * 1000
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-profile.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-profile.json 
b/measure/src/test/resources/config-profile.json
index 6b82d7f..2529def 100644
--- a/measure/src/test/resources/config-profile.json
+++ b/measure/src/test/resources/config-profile.json
@@ -2,6 +2,8 @@
   "name": "prof1",
   "type": "profile",
 
+  "process.type": "batch",
+
   "source": {
     "type": "avro",
     "version": "1.7",

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-streaming.json 
b/measure/src/test/resources/config-streaming.json
new file mode 100644
index 0000000..697fb7b
--- /dev/null
+++ b/measure/src/test/resources/config-streaming.json
@@ -0,0 +1,69 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "process.type": "streaming",
+
+  "source": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "10.149.247.156:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "sss",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "hive",
+      "config": {
+        "database": "griffin_cache",
+        "table.name": "src",
+        "parent.path": "hdfs://localhost/griffin/streaming/dump",
+        "info.path": "src",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-2m", "0"]
+    },
+    "match.once": true
+  },
+
+  "target": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "10.149.247.156:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "ttt",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "hive",
+      "config": {
+        "database": "griffin_cache",
+        "table.name": "tgt",
+        "parent.path": "hdfs://localhost/griffin/streaming/dump",
+        "info.path": "tgt",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-2m", "0"]
+    },
+    "match.once": false
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 0.2,
+    "rules": "$source.json().name = $target.json().name AND $source.json().age 
= $target.json().age"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-streaming1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-streaming1.json 
b/measure/src/test/resources/config-streaming1.json
new file mode 100644
index 0000000..5465201
--- /dev/null
+++ b/measure/src/test/resources/config-streaming1.json
@@ -0,0 +1,65 @@
+{
+  "name": "accu2",
+  "type": "accuracy",
+
+  "process.type": "streaming",
+
+  "source": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "10.149.247.156:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "sss",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "df",
+      "config": {
+        "cache.level": "MEMORY_AND_DISK",
+        "info.path": "src",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-2m", "0"]
+    },
+    "match.once": true
+  },
+
+  "target": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "10.149.247.156:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "ttt",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "df",
+      "config": {
+        "cache.level": "MEMORY_AND_DISK",
+        "info.path": "tgt",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-2m", "0"]
+    },
+    "match.once": false
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 0.2,
+    "rules": "$source.json().name = $target.json().name AND $source.json().age 
= $target.json().age"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-streaming2.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-streaming2.json 
b/measure/src/test/resources/config-streaming2.json
new file mode 100644
index 0000000..061382b
--- /dev/null
+++ b/measure/src/test/resources/config-streaming2.json
@@ -0,0 +1,65 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "process.type": "streaming",
+
+  "source": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "10.149.247.156:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "sss",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "parquet",
+      "config": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-2m", "0"]
+    },
+    "match.once": true
+  },
+
+  "target": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "10.149.247.156:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "ttt",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "parquet",
+      "config": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-2m", "0"]
+    },
+    "match.once": false
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 0.2,
+    "rules": "$source.json().name = $target.json().name AND $source.json().age 
= $target.json().age"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-streaming3.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-streaming3.json 
b/measure/src/test/resources/config-streaming3.json
new file mode 100644
index 0000000..fe3e56f
--- /dev/null
+++ b/measure/src/test/resources/config-streaming3.json
@@ -0,0 +1,65 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "process.type": "streaming",
+
+  "source": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "10.149.247.156:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "sss",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "text",
+      "config": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-2m", "0"]
+    },
+    "match.once": true
+  },
+
+  "target": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "10.149.247.156:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "ttt",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "text",
+      "config": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-2m", "0"]
+    },
+    "match.once": false
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 0.2,
+    "rules": "$source.json().name = $target.json().name AND $source.json().age 
= $target.json().age"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config.json 
b/measure/src/test/resources/config.json
index 65e0ed9..08a6021 100644
--- a/measure/src/test/resources/config.json
+++ b/measure/src/test/resources/config.json
@@ -2,6 +2,8 @@
   "name": "accu1",
   "type": "accuracy",
 
+  "process.type": "batch",
+
   "source": {
     "type": "avro",
     "version": "1.7",
@@ -20,6 +22,6 @@
 
   "evaluateRule": {
     "sampleRatio": 1,
-    "rules": "$source.user_id > 10020 AND $source.user_id + 5 = 
$target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + 
(10 + 2) AND $source.last_name = $target.last_name AND $source.address = 
$target.address AND $source.email = $target.email AND $source.phone = 
$target.phone AND $source.post_code = $target.post_code"
+    "rules": "$source.user_id + 5 = $target.user_id + (2 + 3) AND 
$source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = 
$target.last_name AND $source.address = $target.address AND $source.email = 
$target.email AND $source.phone = $target.phone AND $source.post_code = 
$target.post_code WHEN $source.user_id > 10015"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config1.json 
b/measure/src/test/resources/config1.json
index d7290ba..16c265d 100644
--- a/measure/src/test/resources/config1.json
+++ b/measure/src/test/resources/config1.json
@@ -2,6 +2,8 @@
   "name": "accu-test",
   "type": "accuracy",
 
+  "process.type": "batch",
+
   "source": {
     "type": "hive",
     "version": "1.2",

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-streaming.json 
b/measure/src/test/resources/env-streaming.json
new file mode 100644
index 0000000..42b4aa9
--- /dev/null
+++ b/measure/src/test/resources/env-streaming.json
@@ -0,0 +1,45 @@
+{
+  "spark": {
+    "log.level": "WARN",
+    "checkpoint.dir": "hdfs://localhost/test/griffin/cp",
+    "batch.interval": "2s",
+    "process.interval": "10s",
+    "config": {
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafkaMaxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4,
+      "spark.yarn.maxAppAttempts": 5,
+      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
+      "spark.yarn.max.executor.failures": 120,
+      "spark.yarn.executor.failuresValidityInterval": "1h",
+      "spark.hadoop.fs.hdfs.impl.disable.cache": true
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 100
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "localhost:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ],
+
+  "cleaner": {
+    "clean.interval": "2m"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env.json 
b/measure/src/test/resources/env.json
index 3a9e38c..14e3b75 100644
--- a/measure/src/test/resources/env.json
+++ b/measure/src/test/resources/env.json
@@ -1,7 +1,9 @@
 {
   "spark": {
-    "log.level": "ERROR",
+    "log.level": "INFO",
     "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "batch.interval": "10s",
+    "process.interval": "10m",
     "config": {}
   },
 
@@ -9,14 +11,30 @@
     {
       "type": "hdfs",
       "config": {
-        "path": "hdfs:///griffin/streaming/persist"
+        "path": "hdfs:///griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
       }
     },
     {
       "type": "http",
       "config": {
         "method": "post",
-        "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/";
+        "api": "http://HOSTNAME:9200/griffin/accuracy";
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "localhost:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
       }
     }
   ],

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
new file mode 100644
index 0000000..6a60326
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
@@ -0,0 +1,198 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.algo
+//
+//import java.util.Date
+//
+//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo
+//import org.apache.griffin.measure.config.params._
+//import org.apache.griffin.measure.config.params.env._
+//import org.apache.griffin.measure.config.params.user._
+//import org.apache.griffin.measure.config.reader._
+//import org.apache.griffin.measure.config.validator._
+//import org.apache.griffin.measure.connector.direct.DirectDataConnector
+//import org.apache.griffin.measure.connector.{DataConnector, 
DataConnectorFactory}
+//import org.apache.griffin.measure.log.Loggable
+//import org.apache.griffin.measure.rule.expr._
+//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Success, Try}
+//
+//
+//@RunWith(classOf[JUnitRunner])
+//class BatchAccuracyAlgoTest extends FunSuite with Matchers with 
BeforeAndAfter with Loggable {
+//
+//  val envFile = "src/test/resources/env.json"
+//  val confFile = "src/test/resources/config.json"
+////  val confFile = 
"{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id
 + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = 
$target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND 
$source.address = $target.address AND $source.email = $target.email AND 
$source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 
OR true) WHEN true AND $source.user_id > 10020\"}}"
+//  val envFsType = "local"
+//  val userFsType = "local"
+//
+//  val args = Array(envFile, confFile)
+//
+//  var sc: SparkContext = _
+//  var sqlContext: SQLContext = _
+//
+//  var allParam: AllParam = _
+//
+//  before {
+//    // read param files
+//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    allParam = AllParam(envParam, userParam)
+//
+//    // validate param files
+//    validateParams(allParam) match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-3)
+//      }
+//      case _ => {
+//        info("params validation pass")
+//      }
+//    }
+//
+//    val metricName = userParam.name
+//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+//    sc = new SparkContext(conf)
+//    sqlContext = new SQLContext(sc)
+//  }
+//
+//  test("algorithm") {
+//    Try {
+//      val envParam = allParam.envParam
+//      val userParam = allParam.userParam
+//
+//      // start time
+//      val startTime = new Date().getTime()
+//
+//      // get spark application id
+//      val applicationId = sc.applicationId
+//
+//      // rules
+//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+//      val rule: StatementExpr = ruleFactory.generateRule()
+//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+//
+//      ruleAnalyzer.constCacheExprs.foreach(println)
+//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
+//
+//      // global cache data
+//      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
+//      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
+//      val finalConstMap = finalConstExprValueMap.headOption match {
+//        case Some(m) => m
+//        case _ => Map[String, Any]()
+//      }
+//
+//      // data connector
+//      val sourceDataConnector: DirectDataConnector =
+//        DataConnectorFactory.getDirectDataConnector(sqlContext, null, 
userParam.sourceParam,
+//          ruleAnalyzer.sourceRuleExprs, finalConstMap
+//        ) match {
+//          case Success(cntr) => {
+//            if (cntr.available) cntr
+//            else throw new Exception("source data not available!")
+//          }
+//          case Failure(ex) => throw ex
+//        }
+//      val targetDataConnector: DirectDataConnector =
+//        DataConnectorFactory.getDirectDataConnector(sqlContext, null, 
userParam.targetParam,
+//          ruleAnalyzer.targetRuleExprs, finalConstMap
+//        ) match {
+//          case Success(cntr) => {
+//            if (cntr.available) cntr
+//            else throw new Exception("target data not available!")
+//          }
+//          case Failure(ex) => throw ex
+//        }
+//
+//      // get metadata
+////      val sourceMetaData: Iterable[(String, String)] = 
sourceDataConnector.metaData() match {
+////        case Success(md) => md
+////        case Failure(ex) => throw ex
+////      }
+////      val targetMetaData: Iterable[(String, String)] = 
targetDataConnector.metaData() match {
+////        case Success(md) => md
+////        case Failure(ex) => throw ex
+////      }
+//
+//      // get data
+//      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = 
sourceDataConnector.data() match {
+//        case Success(dt) => dt
+//        case Failure(ex) => throw ex
+//      }
+//      val targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = 
targetDataConnector.data() match {
+//        case Success(dt) => dt
+//        case Failure(ex) => throw ex
+//      }
+//
+//      // my algo
+//      val algo = BatchAccuracyAlgo(allParam)
+//
+//      // accuracy algorithm
+//      val (accuResult, missingRdd, matchedRdd) = algo.accuracy(sourceData, 
targetData, ruleAnalyzer)
+//
+//      println(s"match percentage: ${accuResult.matchPercentage}, total 
count: ${accuResult.total}")
+//
+//      missingRdd.map(rec => algo.record2String(rec, 
ruleAnalyzer.sourceRuleExprs.persistExprs, 
ruleAnalyzer.targetRuleExprs.persistExprs)).foreach(println)
+//
+//      // end time
+//      val endTime = new Date().getTime
+//      println(s"using time: ${endTime - startTime} ms")
+//    } match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-4)
+//      }
+//      case _ => {
+//        info("calculation finished")
+//      }
+//    }
+//  }
+//
+//  private def readParamFile[T <: Param](file: String, fsType: 
String)(implicit m : Manifest[T]): Try[T] = {
+//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+//    paramReader.readConfig[T]
+//  }
+//
+//  private def validateParams(allParam: AllParam): Try[Boolean] = {
+//    val allParamValidator = AllParamValidator()
+//    allParamValidator.validate(allParam)
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
new file mode 100644
index 0000000..e0f500a
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
@@ -0,0 +1,173 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.algo
+//
+//import java.util.Date
+//
+//import org.apache.griffin.measure.algo.batch.BatchProfileAlgo
+//import org.apache.griffin.measure.config.params._
+//import org.apache.griffin.measure.config.params.env._
+//import org.apache.griffin.measure.config.params.user._
+//import org.apache.griffin.measure.config.reader._
+//import org.apache.griffin.measure.config.validator._
+//import org.apache.griffin.measure.connector.direct.DirectDataConnector
+//import org.apache.griffin.measure.connector.{DataConnector, 
DataConnectorFactory}
+//import org.apache.griffin.measure.log.Loggable
+//import org.apache.griffin.measure.rule.expr._
+//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Success, Try}
+//
+//
+//@RunWith(classOf[JUnitRunner])
+//class BatchProfileAlgoTest extends FunSuite with Matchers with 
BeforeAndAfter with Loggable {
+//
+//  val envFile = "src/test/resources/env.json"
+//  val confFile = "src/test/resources/config-profile.json"
+//  val envFsType = "local"
+//  val userFsType = "local"
+//
+//  val args = Array(envFile, confFile)
+//
+//  var sc: SparkContext = _
+//  var sqlContext: SQLContext = _
+//
+//  var allParam: AllParam = _
+//
+//  before {
+//    // read param files
+//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    allParam = AllParam(envParam, userParam)
+//
+//    // validate param files
+//    validateParams(allParam) match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-3)
+//      }
+//      case _ => {
+//        info("params validation pass")
+//      }
+//    }
+//
+//    val metricName = userParam.name
+//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+//    sc = new SparkContext(conf)
+//    sqlContext = new SQLContext(sc)
+//  }
+//
+//  test("algorithm") {
+//    Try {
+//      val envParam = allParam.envParam
+//      val userParam = allParam.userParam
+//
+//      // start time
+//      val startTime = new Date().getTime()
+//
+//      // get spark application id
+//      val applicationId = sc.applicationId
+//
+//      // rules
+//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+//      val rule: StatementExpr = ruleFactory.generateRule()
+//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+//
+//      ruleAnalyzer.constCacheExprs.foreach(println)
+//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
+//
+//      // global cache data
+//      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
+//      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
+//      val finalConstMap = finalConstExprValueMap.headOption match {
+//        case Some(m) => m
+//        case _ => Map[String, Any]()
+//      }
+//
+//      // data connector
+//      val sourceDataConnector: DirectDataConnector =
+//        DataConnectorFactory.getDirectDataConnector(sqlContext, null, 
userParam.sourceParam,
+//          ruleAnalyzer.sourceRuleExprs, finalConstMap
+//        ) match {
+//          case Success(cntr) => {
+//            if (cntr.available) cntr
+//            else throw new Exception("source data not available!")
+//          }
+//          case Failure(ex) => throw ex
+//        }
+//
+//      // get data
+//      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = 
sourceDataConnector.data() match {
+//        case Success(dt) => dt
+//        case Failure(ex) => throw ex
+//      }
+//
+//      // my algo
+//      val algo = BatchProfileAlgo(allParam)
+//
+//      // profile algorithm
+//      val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, 
ruleAnalyzer)
+//
+//      println(s"match percentage: ${profileResult.matchPercentage}, match 
count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
+//
+//      matchedRdd.map(rec => algo.record2String(rec, 
ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
+//
+//      // end time
+//      val endTime = new Date().getTime
+//      println(s"using time: ${endTime - startTime} ms")
+//    } match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-4)
+//      }
+//      case _ => {
+//        info("calculation finished")
+//      }
+//    }
+//  }
+//
+//  private def readParamFile[T <: Param](file: String, fsType: 
String)(implicit m : Manifest[T]): Try[T] = {
+//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+//    paramReader.readConfig[T]
+//  }
+//
+//  private def validateParams(allParam: AllParam): Try[Boolean] = {
+//    val allParamValidator = AllParamValidator()
+//    allParamValidator.validate(allParam)
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
new file mode 100644
index 0000000..a76712f
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
@@ -0,0 +1,172 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.algo.batch
+//
+//import java.util.Date
+//
+//import org.apache.griffin.measure.config.params._
+//import org.apache.griffin.measure.config.params.env._
+//import org.apache.griffin.measure.config.params.user._
+//import org.apache.griffin.measure.config.reader._
+//import org.apache.griffin.measure.config.validator._
+//import org.apache.griffin.measure.connector.DataConnectorFactory
+//import org.apache.griffin.measure.connector.direct.DirectDataConnector
+//import org.apache.griffin.measure.log.Loggable
+//import org.apache.griffin.measure.rule.expr._
+//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Success, Try}
+//
+//
+//@RunWith(classOf[JUnitRunner])
+//class DataFrameSaveTest extends FunSuite with Matchers with BeforeAndAfter 
with Loggable {
+//
+//  val envFile = "src/test/resources/env.json"
+//  val confFile = "src/test/resources/config-profile.json"
+//  val envFsType = "local"
+//  val userFsType = "local"
+//
+//  val args = Array(envFile, confFile)
+//
+//  var sc: SparkContext = _
+//  var sqlContext: SQLContext = _
+//
+//  var allParam: AllParam = _
+//
+//  before {
+//    // read param files
+//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    allParam = AllParam(envParam, userParam)
+//
+//    // validate param files
+//    validateParams(allParam) match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-3)
+//      }
+//      case _ => {
+//        info("params validation pass")
+//      }
+//    }
+//
+//    val metricName = userParam.name
+//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+//    sc = new SparkContext(conf)
+//    sqlContext = new SQLContext(sc)
+//  }
+//
+//  test("algorithm") {
+//    Try {
+//      val envParam = allParam.envParam
+//      val userParam = allParam.userParam
+//
+//      // start time
+//      val startTime = new Date().getTime()
+//
+//      // get spark application id
+//      val applicationId = sc.applicationId
+//
+//      // rules
+//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+//      val rule: StatementExpr = ruleFactory.generateRule()
+//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+//
+//      ruleAnalyzer.constCacheExprs.foreach(println)
+//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
+//
+//      // global cache data
+//      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
+//      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
+//      val finalConstMap = finalConstExprValueMap.headOption match {
+//        case Some(m) => m
+//        case _ => Map[String, Any]()
+//      }
+//
+//      // data connector
+//      val sourceDataConnector: DirectDataConnector =
+//        DataConnectorFactory.getDirectDataConnector(sqlContext, null, 
userParam.sourceParam,
+//          ruleAnalyzer.sourceRuleExprs, finalConstMap
+//        ) match {
+//          case Success(cntr) => {
+//            if (cntr.available) cntr
+//            else throw new Exception("source data not available!")
+//          }
+//          case Failure(ex) => throw ex
+//        }
+//
+//      // get data
+//      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = 
sourceDataConnector.data() match {
+//        case Success(dt) => dt
+//        case Failure(ex) => throw ex
+//      }
+//
+//      // my algo
+//      val algo = BatchProfileAlgo(allParam)
+//
+//      // profile algorithm
+//      val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, 
ruleAnalyzer)
+//
+//      println(s"match percentage: ${profileResult.matchPercentage}, match 
count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
+//
+//      matchedRdd.map(rec => algo.record2String(rec, 
ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
+//
+//      // end time
+//      val endTime = new Date().getTime
+//      println(s"using time: ${endTime - startTime} ms")
+//    } match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-4)
+//      }
+//      case _ => {
+//        info("calculation finished")
+//      }
+//    }
+//  }
+//
+//  private def readParamFile[T <: Param](file: String, fsType: 
String)(implicit m : Manifest[T]): Try[T] = {
+//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+//    paramReader.readConfig[T]
+//  }
+//
+//  private def validateParams(allParam: AllParam): Try[Boolean] = {
+//    val allParamValidator = AllParamValidator()
+//    allParamValidator.validate(allParam)
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
new file mode 100644
index 0000000..2179fba
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
@@ -0,0 +1,89 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.core
+
+import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
+import org.apache.griffin.measure.rule.expr._
+import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.PrivateMethodTester
+
+@RunWith(classOf[JUnitRunner])
+class AccuracyCoreTest extends FunSuite with Matchers with BeforeAndAfter with 
PrivateMethodTester {
+
+  def findExprId(exprs: Iterable[Expr], desc: String): String = {
+    exprs.find(_.desc == desc) match {
+      case Some(expr) => expr._id
+      case _ => ""
+    }
+  }
+
+  test ("match data success") {
+    val rule = "$source.name = $target.name AND $source.age < $target.age"
+    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
+    val ruleFactory = RuleFactory(evaluateRuleParam)
+    val statement = ruleFactory.generateRule
+    val ruleAnalyzer = RuleAnalyzer(statement)
+
+    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
+    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
+
+    val source = (Map[String, Any](
+      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
+      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
+    ), Map[String, Any]())
+    val target = (Map[String, Any](
+      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
+      (findExprId(targetPersistExprs, "$target['age']") -> 27)
+    ), Map[String, Any]())
+
+    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
+    val result = AccuracyCore invokePrivate matchData(source, target, 
ruleAnalyzer)
+    result._1 should be (true)
+    result._2.size should be (0)
+  }
+
+  test ("match data fail") {
+    val rule = "$source.name = $target.name AND $source.age = $target.age"
+    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
+    val ruleFactory = RuleFactory(evaluateRuleParam)
+    val statement = ruleFactory.generateRule
+    val ruleAnalyzer = RuleAnalyzer(statement)
+
+    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
+    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
+
+    val source = (Map[String, Any](
+      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
+      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
+    ), Map[String, Any]())
+    val target = (Map[String, Any](
+      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
+      (findExprId(targetPersistExprs, "$target['age']") -> 27)
+    ), Map[String, Any]())
+
+    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
+    val result = AccuracyCore invokePrivate matchData(source, target, 
ruleAnalyzer)
+    result._1 should be (false)
+    result._2.size shouldNot be (0)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
new file mode 100644
index 0000000..087e8e5
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.core
+
+import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
+import org.apache.griffin.measure.rule.expr._
+import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.PrivateMethodTester
+
+@RunWith(classOf[JUnitRunner])
+class ProfileCoreTest extends FunSuite with Matchers with BeforeAndAfter with 
PrivateMethodTester {
+
+  def findExprId(exprs: Iterable[Expr], desc: String): String = {
+    exprs.find(_.desc == desc) match {
+      case Some(expr) => expr._id
+      case _ => ""
+    }
+  }
+
+  test ("match data success") {
+    val rule = "$source.name = 'jack' AND $source.age = null"
+    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
+    val ruleFactory = RuleFactory(evaluateRuleParam)
+    val statement = ruleFactory.generateRule
+    val ruleAnalyzer = RuleAnalyzer(statement)
+
+    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
+
+    val source = (Map[String, Any](
+      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
+      (findExprId(sourcePersistExprs, "$source['age']") -> null)
+    ), Map[String, Any]())
+
+    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
+    val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
+    result._1 should be (true)
+    result._2.size should be (0)
+  }
+
+  test ("match data fail") {
+    val rule = "$source.name = 'jack' AND $source.age != null"
+    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
+    val ruleFactory = RuleFactory(evaluateRuleParam)
+    val statement = ruleFactory.generateRule
+    val ruleAnalyzer = RuleAnalyzer(statement)
+
+    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
+
+    val source = (Map[String, Any](
+      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
+      (findExprId(sourcePersistExprs, "$source['age']") -> null)
+    ), Map[String, Any]())
+
+    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
+    val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
+    result._1 should be (false)
+    result._2.size shouldNot be (0)
+  }
+
+}

Reply via email to