http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
new file mode 100644
index 0000000..a22f91f
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
@@ -0,0 +1,267 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.algo.streaming
+//
+//import java.util.Date
+//import java.util.concurrent.TimeUnit
+//
+//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo
+//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, 
TimeInfoCache}
+//import org.apache.griffin.measure.cache.result._
+//import org.apache.griffin.measure.config.params._
+//import org.apache.griffin.measure.config.params.env._
+//import org.apache.griffin.measure.config.params.user._
+//import org.apache.griffin.measure.config.reader._
+//import org.apache.griffin.measure.config.validator._
+//import org.apache.griffin.measure.connector.direct.DirectDataConnector
+//import org.apache.griffin.measure.connector.{DataConnector, 
DataConnectorFactory}
+//import org.apache.griffin.measure.log.Loggable
+//import org.apache.griffin.measure.persist.{Persist, PersistFactory, 
PersistType}
+//import org.apache.griffin.measure.result._
+//import org.apache.griffin.measure.rule.expr._
+//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
+//import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.sql.hive.HiveContext
+//import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+//import org.apache.spark.{SparkConf, SparkContext}
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Success, Try}
+//
+//
+//@RunWith(classOf[JUnitRunner])
+//class StreamingAccuracyAlgoTest extends FunSuite with Matchers with 
BeforeAndAfter with Loggable {
+//
+//  val envFile = "src/test/resources/env-streaming.json"
+//  val confFile = "src/test/resources/config-streaming3.json"
+//  val envFsType = "local"
+//  val userFsType = "local"
+//
+//  val args = Array(envFile, confFile)
+//
+//  var sc: SparkContext = _
+//  var sqlContext: SQLContext = _
+////  val ssc: StreamingContext = _
+//
+//  var allParam: AllParam = _
+//
+//  before {
+//    // read param files
+//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+//      case Success(p) => p
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-2)
+//      }
+//    }
+//    allParam = AllParam(envParam, userParam)
+//
+//    // validate param files
+//    validateParams(allParam) match {
+//      case Failure(ex) => {
+//        error(ex.getMessage)
+//        sys.exit(-3)
+//      }
+//      case _ => {
+//        info("params validation pass")
+//      }
+//    }
+//
+//    val metricName = userParam.name
+//    val sparkParam = envParam.sparkParam
+//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
+//    conf.setAll(sparkParam.config)
+//    sc = new SparkContext(conf)
+//    sc.setLogLevel(envParam.sparkParam.logLevel)
+//    sqlContext = new SQLContext(sc)
+////    sqlContext = new HiveContext(sc)
+//
+////    val a = sqlContext.sql("select * from s1 limit 10")
+////    //    val a = sqlContext.sql("show tables")
+////    a.show(10)
+////
+////    val b = HdfsUtil.existPath("/griffin/streaming")
+////    println(b)
+//  }
+//
+//  test("algorithm") {
+//    val envParam = allParam.envParam
+//    val userParam = allParam.userParam
+//    val metricName = userParam.name
+//    val sparkParam = envParam.sparkParam
+//    val cleanerParam = envParam.cleanerParam
+//
+////    val ssc = StreamingContext.getOrCreate(sparkParam.cpDir,
+////      ( ) => {
+////        try {
+////          val batchInterval = 
TimeUtil.milliseconds(sparkParam.batchInterval) match {
+////            case Some(interval) => Milliseconds(interval)
+////            case _ => throw new Exception("invalid batch interval")
+////          }
+////          val ssc = new StreamingContext(sc, batchInterval)
+////          ssc.checkpoint(sparkParam.cpDir)
+////          ssc
+////        } catch {
+////          case runtime: RuntimeException => {
+////            throw runtime
+////          }
+////        }
+////      })
+//
+//    val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) 
match {
+//      case Some(interval) => Milliseconds(interval)
+//      case _ => throw new Exception("invalid batch interval")
+//    }
+//    val ssc = new StreamingContext(sc, batchInterval)
+//    ssc.checkpoint(sparkParam.cpDir)
+//
+//    // start time
+//    val startTime = new Date().getTime()
+//
+//    val persistFactory = PersistFactory(envParam.persistParams, metricName)
+//
+//    // get persists to persist measure result
+//    val appPersist: Persist = persistFactory.getPersists(startTime)
+//
+//    // get spark application id
+//    val applicationId = sc.applicationId
+//
+//    // persist start id
+//    appPersist.start(applicationId)
+//
+//    InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
+//    InfoCacheInstance.init
+//
+//    // generate rule from rule param, generate rule analyzer
+//    val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+//    val rule: StatementExpr = ruleFactory.generateRule()
+//    val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+//
+//    // const expr value map
+//    val constExprValueMap = ExprValueUtil.genExprValueMaps(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
+//    val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
+//    val finalConstMap = finalConstExprValueMap.headOption match {
+//      case Some(m) => m
+//      case _ => Map[String, Any]()
+//    }
+//
+//    // data connector
+//    val sourceDataConnector: DirectDataConnector =
+//      DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, 
userParam.sourceParam,
+//        ruleAnalyzer.sourceRuleExprs, finalConstMap
+//      ) match {
+//        case Success(cntr) => {
+//          if (cntr.available) cntr
+//          else throw new Exception("source data connection error!")
+//        }
+//        case Failure(ex) => throw ex
+//      }
+//    val targetDataConnector: DirectDataConnector =
+//      DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, 
userParam.targetParam,
+//        ruleAnalyzer.targetRuleExprs, finalConstMap
+//      ) match {
+//        case Success(cntr) => {
+//          if (cntr.available) cntr
+//          else throw new Exception("target data connection error!")
+//        }
+//        case Failure(ex) => throw ex
+//      }
+//
+//    val cacheResultProcesser = CacheResultProcesser()
+//
+//    // init data stream
+//    sourceDataConnector.init()
+//    targetDataConnector.init()
+//
+//    // my algo
+//    val algo = StreamingAccuracyAlgo(allParam)
+//
+//    val streamingAccuracyProcess = StreamingAccuracyProcess(
+//      sourceDataConnector, targetDataConnector,
+//      ruleAnalyzer, cacheResultProcesser, persistFactory, appPersist)
+//
+//    val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) 
match {
+//      case Some(interval) => interval
+//      case _ => throw new Exception("invalid batch interval")
+//    }
+//    val process = TimingProcess(processInterval, streamingAccuracyProcess)
+//
+//    // clean thread
+////    case class Clean() extends Runnable {
+////      val lock = InfoCacheInstance.genLock("clean")
+////      def run(): Unit = {
+////        val locked = lock.lock(5, TimeUnit.SECONDS)
+////        if (locked) {
+////          try {
+////            sourceDataConnector.cleanData
+////            targetDataConnector.cleanData
+////          } finally {
+////            lock.unlock()
+////          }
+////        }
+////      }
+////    }
+////    val cleanInterval = TimeUtil.milliseconds(cleanerParam.cleanInterval) 
match {
+////      case Some(interval) => interval
+////      case _ => throw new Exception("invalid batch interval")
+////    }
+////    val clean = TimingProcess(cleanInterval, Clean())
+//
+//    process.startup()
+////    clean.startup()
+//
+//    ssc.start()
+//    ssc.awaitTermination()
+//    ssc.stop(stopSparkContext=true, stopGracefully=true)
+//
+//    println("================ end ================")
+//
+//    // context stop
+//    sc.stop
+//
+//    InfoCacheInstance.close
+//
+//    appPersist.finish()
+//
+//    process.shutdown()
+////    clean.shutdown()
+//  }
+//
+//  private def readParamFile[T <: Param](file: String, fsType: 
String)(implicit m : Manifest[T]): Try[T] = {
+//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+//    paramReader.readConfig[T]
+//  }
+//
+//  private def validateParams(allParam: AllParam): Try[Boolean] = {
+//    val allParamValidator = AllParamValidator()
+//    allParamValidator.validate(allParam)
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
deleted file mode 100644
index e080d19..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-// package org.apache.griffin.measure.batch.algo
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.batch.config.params._
-//import org.apache.griffin.measure.batch.config.params.env._
-//import org.apache.griffin.measure.batch.config.params.user._
-//import org.apache.griffin.measure.batch.config.reader._
-//import org.apache.griffin.measure.batch.config.validator._
-//import org.apache.griffin.measure.batch.connector.{DataConnector, 
DataConnectorFactory}
-//import org.apache.griffin.measure.batch.log.Loggable
-//import org.apache.griffin.measure.batch.rule.expr._
-//import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchAccuracyAlgoTest extends FunSuite with Matchers with 
BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env.json"
-////  val confFile = "src/test/resources/config.json"
-//  val confFile = 
"{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id
 + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = 
$target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND 
$source.address = $target.address AND $source.email = $target.email AND 
$source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 
OR true) WHEN true AND $source.user_id > 10020\"}}"
-//  val envFsType = "local"
-//  val userFsType = "raw"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//
-//    val metricName = userParam.name
-//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-//    sc = new SparkContext(conf)
-//    sqlContext = new SQLContext(sc)
-//  }
-//
-//  test("algorithm") {
-//    Try {
-//      val envParam = allParam.envParam
-//      val userParam = allParam.userParam
-//
-//      // start time
-//      val startTime = new Date().getTime()
-//
-//      // get spark application id
-//      val applicationId = sc.applicationId
-//
-//      // rules
-//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-//      val rule: StatementExpr = ruleFactory.generateRule()
-//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-//      ruleAnalyzer.constCacheExprs.foreach(println)
-//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-//      // global cache data
-//      val constExprValueMap = ExprValueUtil.genExprValueMap(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
-//      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
-//
-//      // data connector
-//      val sourceDataConnector: DataConnector =
-//        DataConnectorFactory.getDataConnector(sqlContext, 
userParam.sourceParam,
-//          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("source data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//      val targetDataConnector: DataConnector =
-//        DataConnectorFactory.getDataConnector(sqlContext, 
userParam.targetParam,
-//          ruleAnalyzer.targetRuleExprs, finalConstExprValueMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("target data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//
-//      // get metadata
-////      val sourceMetaData: Iterable[(String, String)] = 
sourceDataConnector.metaData() match {
-////        case Success(md) => md
-////        case Failure(ex) => throw ex
-////      }
-////      val targetMetaData: Iterable[(String, String)] = 
targetDataConnector.metaData() match {
-////        case Success(md) => md
-////        case Failure(ex) => throw ex
-////      }
-//
-//      // get data
-//      val sourceData: RDD[(Product, Map[String, Any])] = 
sourceDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//      val targetData: RDD[(Product, Map[String, Any])] = 
targetDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//
-//      // my algo
-//      val algo = BatchAccuracyAlgo(allParam)
-//
-//      // accuracy algorithm
-//      val (accuResult, missingRdd, matchedRdd) = algo.accuracy(sourceData, 
targetData, ruleAnalyzer)
-//
-//      println(s"match percentage: ${accuResult.matchPercentage}, total 
count: ${accuResult.total}")
-//
-//      missingRdd.map(rec => algo.record2String(rec, 
ruleAnalyzer.sourceRuleExprs.persistExprs, 
ruleAnalyzer.targetRuleExprs.persistExprs)).foreach(println)
-//
-//      // end time
-//      val endTime = new Date().getTime
-//      println(s"using time: ${endTime - startTime} ms")
-//    } match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-4)
-//      }
-//      case _ => {
-//        info("calculation finished")
-//      }
-//    }
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: 
String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
deleted file mode 100644
index 7a87db9..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgoTest.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.batch.algo
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.batch.config.params._
-//import org.apache.griffin.measure.batch.config.params.env._
-//import org.apache.griffin.measure.batch.config.params.user._
-//import org.apache.griffin.measure.batch.config.reader._
-//import org.apache.griffin.measure.batch.config.validator._
-//import org.apache.griffin.measure.batch.connector.{DataConnector, 
DataConnectorFactory}
-//import org.apache.griffin.measure.batch.log.Loggable
-//import org.apache.griffin.measure.batch.rule.expr._
-//import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchProfileAlgoTest extends FunSuite with Matchers with 
BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env.json"
-//  val confFile = "src/test/resources/config-profile.json"
-//  val envFsType = "local"
-//  val userFsType = "local"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//
-//    val metricName = userParam.name
-//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-//    sc = new SparkContext(conf)
-//    sqlContext = new SQLContext(sc)
-//  }
-//
-//  test("algorithm") {
-//    Try {
-//      val envParam = allParam.envParam
-//      val userParam = allParam.userParam
-//
-//      // start time
-//      val startTime = new Date().getTime()
-//
-//      // get spark application id
-//      val applicationId = sc.applicationId
-//
-//      // rules
-//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-//      val rule: StatementExpr = ruleFactory.generateRule()
-//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-//      ruleAnalyzer.constCacheExprs.foreach(println)
-//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-//      // global cache data
-//      val constExprValueMap = ExprValueUtil.genExprValueMap(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
-//      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
-//
-//      // data connector
-//      val sourceDataConnector: DataConnector =
-//        DataConnectorFactory.getDataConnector(sqlContext, 
userParam.sourceParam,
-//          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("source data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//
-//      // get data
-//      val sourceData: RDD[(Product, Map[String, Any])] = 
sourceDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//
-//      // my algo
-//      val algo = BatchProfileAlgo(allParam)
-//
-//      // profile algorithm
-//      val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, 
ruleAnalyzer)
-//
-//      println(s"match percentage: ${profileResult.matchPercentage}, match 
count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
-//
-//      matchedRdd.map(rec => algo.record2String(rec, 
ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
-//
-//      // end time
-//      val endTime = new Date().getTime
-//      println(s"using time: ${endTime - startTime} ms")
-//    } match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-4)
-//      }
-//      case _ => {
-//        info("calculation finished")
-//      }
-//    }
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: 
String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
deleted file mode 100644
index 0545a70..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCoreTest.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo.core
-
-import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-import org.scalatest.PrivateMethodTester
-
-@RunWith(classOf[JUnitRunner])
-class AccuracyCoreTest extends FunSuite with Matchers with BeforeAndAfter with 
PrivateMethodTester {
-
-  def findExprId(exprs: Iterable[Expr], desc: String): String = {
-    exprs.find(_.desc == desc) match {
-      case Some(expr) => expr._id
-      case _ => ""
-    }
-  }
-
-  test ("match data success") {
-    val rule = "$source.name = $target.name AND $source.age < $target.age"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
-    ), Map[String, Any]())
-    val target = (Map[String, Any](
-      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
-      (findExprId(targetPersistExprs, "$target['age']") -> 27)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = AccuracyCore invokePrivate matchData(source, target, 
ruleAnalyzer)
-    result._1 should be (true)
-    result._2.size should be (0)
-  }
-
-  test ("match data fail") {
-    val rule = "$source.name = $target.name AND $source.age = $target.age"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
-    ), Map[String, Any]())
-    val target = (Map[String, Any](
-      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
-      (findExprId(targetPersistExprs, "$target['age']") -> 27)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = AccuracyCore invokePrivate matchData(source, target, 
ruleAnalyzer)
-    result._1 should be (false)
-    result._2.size shouldNot be (0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/ProfileCoreTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/ProfileCoreTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/ProfileCoreTest.scala
deleted file mode 100644
index 376330b..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/algo/core/ProfileCoreTest.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo.core
-
-import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-import org.scalatest.PrivateMethodTester
-
-@RunWith(classOf[JUnitRunner])
-class ProfileCoreTest extends FunSuite with Matchers with BeforeAndAfter with 
PrivateMethodTester {
-
-  def findExprId(exprs: Iterable[Expr], desc: String): String = {
-    exprs.find(_.desc == desc) match {
-      case Some(expr) => expr._id
-      case _ => ""
-    }
-  }
-
-  test ("match data success") {
-    val rule = "$source.name = 'jack' AND $source.age = null"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> null)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
-    result._1 should be (true)
-    result._2.size should be (0)
-  }
-
-  test ("match data fail") {
-    val rule = "$source.name = 'jack' AND $source.age != null"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> null)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
-    result._1 should be (false)
-    result._2.size shouldNot be (0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
deleted file mode 100644
index b8fccba..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReaderTest.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.config.params.env._
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class ParamRawStringReaderTest extends FunSuite with Matchers with 
BeforeAndAfter {
-
-  test("read config") {
-    val rawString = """{"type": "hdfs", "config": {"path": "/path/to", "time": 
1234567}}"""
-
-    val reader = ParamRawStringReader(rawString)
-    val paramTry = reader.readConfig[PersistParam]
-    paramTry.isSuccess should be (true)
-    paramTry.get should be (PersistParam("hdfs", Map[String, Any](("path" -> 
"/path/to"), ("time" -> 1234567))))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
deleted file mode 100644
index 24ab301..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidatorTest.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.validator
-
-import org.apache.griffin.measure.batch.config.params._
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
-import org.scalamock.scalatest.MockFactory
-
-@RunWith(classOf[JUnitRunner])
-class AllParamValidatorTest extends FlatSpec with Matchers with BeforeAndAfter 
with MockFactory {
-
-  "validate" should "pass" in {
-    val validator = AllParamValidator()
-    val paramMock = mock[Param]
-    paramMock.validate _ expects () returning (false)
-
-    val validateTry = validator.validate(paramMock)
-    validateTry.isSuccess should be (true)
-    validateTry.get should be (false)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
deleted file mode 100644
index aa4e24b..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HdfsPersistTest.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.persist
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-import scala.util.{Try, Failure}
-
-@RunWith(classOf[JUnitRunner])
-class HdfsPersistTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  val config: Map[String, Any] = Map[String, Any](
-    ("path" -> "/path/to"), ("max.persist.lines" -> 100), 
("max.lines.per.file" -> 1000))
-  val metricName: String = "metric"
-  val timeStamp: Long = 123456789L
-
-  val hdfsPersist = HdfsPersist(config, metricName, timeStamp)
-
-  test ("constructor") {
-    hdfsPersist.path should be ("/path/to")
-    hdfsPersist.maxPersistLines should be (100)
-    hdfsPersist.maxLinesPerFile should be (1000)
-
-    hdfsPersist.StartFile should be 
(s"/path/to/${metricName}/${timeStamp}/_START")
-  }
-
-  test ("avaiable") {
-    hdfsPersist.available should be (true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
deleted file mode 100644
index d03eacb..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/persist/HttpPersistTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.persist
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class HttpPersistTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  val config: Map[String, Any] = Map[String, Any](("api" -> "url/api"), 
("method" -> "post"))
-  val metricName: String = "metric"
-  val timeStamp: Long = 123456789L
-
-  val httpPersist = HttpPersist(config, metricName, timeStamp)
-
-  test ("constructor") {
-    httpPersist.api should be ("url/api")
-    httpPersist.method should be ("post")
-  }
-
-  test("available") {
-    httpPersist.available should be (true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/result/AccuracyResultTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/result/AccuracyResultTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/result/AccuracyResultTest.scala
deleted file mode 100644
index de29fa3..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/result/AccuracyResultTest.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.result
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class AccuracyResultTest extends FunSuite with BeforeAndAfter with Matchers {
-
-  test ("update") {
-    val result = AccuracyResult(10, 100)
-    val delta = AccuracyResult(3, 10)
-    result.update(delta) should be (AccuracyResult(3, 100))
-  }
-
-  test ("eventual") {
-    val result1 = AccuracyResult(10, 100)
-    result1.eventual should be (false)
-
-    val result2 = AccuracyResult(0, 100)
-    result2.eventual should be (true)
-  }
-
-  test ("differsFrom") {
-    val result = AccuracyResult(10, 100)
-    result.differsFrom(AccuracyResult(11, 100)) should be (true)
-    result.differsFrom(AccuracyResult(10, 110)) should be (true)
-    result.differsFrom(AccuracyResult(10, 100)) should be (false)
-  }
-
-  test ("matchPercentage") {
-    val result1 = AccuracyResult(10, 100)
-    result1.matchPercentage should be (90.0)
-
-    val result2 = AccuracyResult(10, 0)
-    result2.matchPercentage should be (0.0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/result/ProfileResultTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/result/ProfileResultTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/result/ProfileResultTest.scala
deleted file mode 100644
index 2903bd6..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/result/ProfileResultTest.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.result
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class ProfileResultTest extends FunSuite with BeforeAndAfter with Matchers {
-
-  test ("update") {
-    val result = ProfileResult(10, 100)
-    val delta = ProfileResult(30, 90)
-    result.update(delta) should be (ProfileResult(40, 100))
-  }
-
-  test ("eventual") {
-    val result1 = ProfileResult(10, 100)
-    result1.eventual should be (false)
-
-    val result2 = ProfileResult(100, 100)
-    result2.eventual should be (true)
-  }
-
-  test ("differsFrom") {
-    val result = ProfileResult(10, 100)
-    result.differsFrom(ProfileResult(11, 100)) should be (true)
-    result.differsFrom(ProfileResult(10, 110)) should be (true)
-    result.differsFrom(ProfileResult(10, 100)) should be (false)
-  }
-
-  test ("matchPercentage") {
-    val result1 = ProfileResult(90, 100)
-    result1.matchPercentage should be (90.0)
-
-    val result2 = ProfileResult(10, 0)
-    result2.matchPercentage should be (0.0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerTest.scala
deleted file mode 100644
index 3088f7c..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerTest.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.rule
-
-import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class RuleAnalyzerTest extends FunSuite with BeforeAndAfter with Matchers {
-
-  test ("rule analyze") {
-    val rule = "$source.name = $target.name AND $source.age = $target.age + (2 
* 5) WHEN $source.born > (6 - 2 * 2)"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    ruleAnalyzer.constCacheExprs.map(_.desc) should be (List[String]("2 * 5", 
"2 * 2", "6 - 2 * 2"))
-    ruleAnalyzer.constFinalCacheExprs.map(_.desc) should be (Set[String]("2 * 
5", "6 - 2 * 2"))
-
-    ruleAnalyzer.sourceRuleExprs.groupbyExprs.map(_.desc) should be 
(List[String](
-      "$source['name']", "$source['age']"))
-    ruleAnalyzer.sourceRuleExprs.cacheExprs.map(_.desc) should be 
(List[String](
-      "$source['name']", "$source['age']", "$source['born']", "$source['born'] 
> 6 - 2 * 2"))
-    ruleAnalyzer.sourceRuleExprs.finalCacheExprs.map(_.desc) should be 
(Set[String](
-      "$source['name']", "$source['age']", "$source['born']", "$source['born'] 
> 6 - 2 * 2"))
-    ruleAnalyzer.sourceRuleExprs.persistExprs.map(_.desc) should be 
(List[String](
-      "$source['name']", "$source['age']", "$source['born']"))
-    ruleAnalyzer.sourceRuleExprs.whenClauseExprOpt.map(_.desc) should be (Some(
-      "$source['born'] > 6 - 2 * 2"))
-
-    ruleAnalyzer.targetRuleExprs.groupbyExprs.map(_.desc) should be 
(List[String](
-      "$target['name']", "$target['age'] + 2 * 5"))
-    ruleAnalyzer.targetRuleExprs.cacheExprs.map(_.desc) should be 
(List[String](
-      "$target['name']", "$target['age']", "$target['age'] + 2 * 5"))
-    ruleAnalyzer.targetRuleExprs.finalCacheExprs.map(_.desc) should be 
(Set[String](
-      "$target['name']", "$target['age']", "$target['age'] + 2 * 5"))
-    ruleAnalyzer.targetRuleExprs.persistExprs.map(_.desc) should be 
(List[String](
-      "$target['name']", "$target['age']"))
-    ruleAnalyzer.targetRuleExprs.whenClauseExprOpt.map(_.desc) should be (Some(
-      "$source['born'] > 6 - 2 * 2"))
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleFactoryTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleFactoryTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleFactoryTest.scala
deleted file mode 100644
index ebadaa0..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleFactoryTest.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.rule
-
-import org.apache.griffin.measure.batch.config.params.user.EvaluateRuleParam
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class RuleFactoryTest extends FunSuite with BeforeAndAfter with Matchers {
-
-  test ("generate rule") {
-    val rule = "$source.name = $target.name AND $source.age = $target.age"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    ruleFactory.generateRule.desc should be ("$source['name'] = 
$target['name'] AND $source['age'] = $target['age']")
-
-    val wrong_rule = "$source.name = $target.name AND $source.age = 
$target1.age"
-    val evaluateRuleParam1 = EvaluateRuleParam(1.0, wrong_rule)
-    val ruleFactory1 = RuleFactory(evaluateRuleParam1)
-    val thrown = intercept[Exception] {
-      ruleFactory1.generateRule
-    }
-    thrown.getMessage should be ("parse rule error!")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
deleted file mode 100644
index a39279e..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.rule
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//import org.scalatest.FlatSpec
-//import org.scalamock.scalatest.MockFactory
-
-@RunWith(classOf[JUnitRunner])
-class RuleParserTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  val ruleParser = RuleParser()
-
-  test ("literal number") {
-    val rule1 = "123"
-    val result1 = ruleParser.parseAll(ruleParser.literal, rule1)
-    result1.successful should be (true)
-    result1.get.value should be (Some(123))
-
-    val rule2 = "12.3"
-    val result2 = ruleParser.parseAll(ruleParser.literal, rule2)
-    result2.successful should be (true)
-    result2.get.value should be (Some(12.3))
-  }
-
-  test ("literial string") {
-    val rule1 = "'123'"
-    val result1 = ruleParser.parseAll(ruleParser.literal, rule1)
-    result1.successful should be (true)
-    result1.get.value should be (Some("123"))
-
-    val rule2 = "\"123\""
-    val result2 = ruleParser.parseAll(ruleParser.literal, rule1)
-    result2.successful should be (true)
-    result2.get.value should be (Some("123"))
-  }
-
-  test ("literial time") {
-    val rule = "3h"
-    val result = ruleParser.parseAll(ruleParser.literal, rule)
-    result.successful should be (true)
-    result.get.value should be (Some(3*3600*1000))
-  }
-
-  test ("literial boolean") {
-    val rule = "true"
-    val result = ruleParser.parseAll(ruleParser.literal, rule)
-    result.successful should be (true)
-    result.get.value should be (Some(true))
-  }
-
-  test ("literial null") {
-    val rule = "null"
-    val result = ruleParser.parseAll(ruleParser.literal, rule)
-    result.successful should be (true)
-    result.get.value should be (Some(null))
-  }
-
-  test ("literial none") {
-    val rule = "none"
-    val result = ruleParser.parseAll(ruleParser.literal, rule)
-    result.successful should be (true)
-    result.get.value should be (None)
-  }
-
-  test ("selection head") {
-    val rule = "$source"
-    val result = ruleParser.parseAll(ruleParser.selectionHead, rule)
-    result.successful should be (true)
-    result.get.head should be ("source")
-  }
-
-  test ("field select") {
-    val rule = ".name"
-    val result = ruleParser.parseAll(ruleParser.selector, rule)
-    result.successful should be (true)
-    result.get.desc should be ("['name']")
-  }
-
-  test ("function operation") {
-    val rule = ".func(1, 'abc', 3 + 4)"
-    val result = ruleParser.parseAll(ruleParser.selector, rule)
-    result.successful should be (true)
-    result.get.desc should be (".func(1, 'abc', 3 + 4)")
-  }
-
-  test ("index field range select") {
-    val rule1 = "['field']"
-    val result1 = ruleParser.parseAll(ruleParser.selector, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("['field']")
-
-    val rule2 = "[1, 4]"
-    val result2 = ruleParser.parseAll(ruleParser.selector, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("[1, 4]")
-
-    val rule3 = "[1, 'name', 'age', 5, (6, 8)]"
-    val result3 = ruleParser.parseAll(ruleParser.selector, rule3)
-    result3.successful should be (true)
-    result3.get.desc should be ("[1, 'name', 'age', 5, (6, 8)]")
-  }
-
-  test ("index field range") {
-    val rule1 = "(3, 5)"
-    val result1 = ruleParser.parseAll(ruleParser.indexFieldRange, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("(3, 5)")
-
-    val rule2 = "'name'"
-    val result2 = ruleParser.parseAll(ruleParser.indexFieldRange, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("'name'")
-
-    val rule3 = "*"
-    val result3 = ruleParser.parseAll(ruleParser.indexFieldRange, rule3)
-    result3.successful should be (true)
-    result3.get.desc should be ("*")
-  }
-
-  test ("filter select") {
-    val rule = "['age' > 16]"
-    val result = ruleParser.parseAll(ruleParser.selector, rule)
-    result.successful should be (true)
-    result.get.desc should be ("['age' > 16]")
-  }
-
-  test ("selection") {
-    val rule = "$source['age' > 16].func(1, 'abc')[1, 3, 'name'].time[*]"
-    val result = ruleParser.parseAll(ruleParser.selection, rule)
-    result.successful should be (true)
-    result.get.desc should be ("$source['age' > 16].func(1, 'abc')[1, 3, 
'name']['time'][*]")
-  }
-
-  test ("math expr") {
-    val rule = "$source.age * 6 + 4 / 2"
-    val result = ruleParser.parseAll(ruleParser.mathExpr, rule)
-    result.successful should be (true)
-    result.get.desc should be ("$source['age'] * 6 + 4 / 2")
-  }
-
-  test ("range expr") {
-    val rule = "($source.age + 1, $target.age + 3, 40)"
-    val result = ruleParser.parseAll(ruleParser.rangeExpr, rule)
-    result.successful should be (true)
-    result.get.desc should be ("($source['age'] + 1, $target['age'] + 3, 40)")
-  }
-
-  test ("logical expr") {
-    val rule1 = "$source.age + 1 = $target.age"
-    val result1 = ruleParser.parseAll(ruleParser.logicalExpr, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("$source['age'] + 1 = $target['age']")
-
-    val rule2 = "$source.age in (3, 5, 6, 10)"
-    val result2 = ruleParser.parseAll(ruleParser.logicalExpr, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("$source['age'] in (3, 5, 6, 10)")
-  }
-
-  test ("logical statement") {
-    val rule1 = "$source.descs[0] = $target.desc AND $source.name = 
$target.name"
-    val result1 = ruleParser.parseAll(ruleParser.logicalStatement, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("$source['descs'][0] = $target['desc'] AND 
$source['name'] = $target['name']")
-
-    val rule2 = "NOT $source.age = $target.age"
-    val result2 = ruleParser.parseAll(ruleParser.logicalStatement, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("NOT $source['age'] = $target['age']")
-  }
-
-  test ("whole rule") {
-    val rule1 = "$source.name = $target.name AND $source.age = $target.age"
-    val result1 = ruleParser.parseAll(ruleParser.rule, rule1)
-    result1.successful should be (true)
-    result1.get.desc should be ("$source['name'] = $target['name'] AND 
$source['age'] = $target['age']")
-
-    val rule2 = "$source.name = $target.name AND $source.age = $target.age 
WHEN $source.id > 1000"
-    val result2 = ruleParser.parseAll(ruleParser.rule, rule2)
-    result2.successful should be (true)
-    result2.get.desc should be ("$source['name'] = $target['name'] AND 
$source['age'] = $target['age'] when $source['id'] > 1000")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/batch/utils/JsonUtilTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/batch/utils/JsonUtilTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/batch/utils/JsonUtilTest.scala
deleted file mode 100644
index 85ca24c..0000000
--- 
a/measure/src/test/scala/org/apache/griffin/measure/batch/utils/JsonUtilTest.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.utils
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-
-@RunWith(classOf[JUnitRunner])
-class JsonUtilTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  val map = Map[String, Any](("name" -> "test"), ("age" -> 15))
-  val json = """{"name":"test","age":15}"""
-
-  val person = JsonUtilTest.Person("test", 15)
-
-  test ("toJson 1") {
-    val symbolMap = map.map(p => (Symbol(p._1), p._2))
-    JsonUtil.toJson(symbolMap) should equal (json)
-  }
-
-  test ("toJson 2") {
-    JsonUtil.toJson(map) should equal (json)
-  }
-
-  test ("toMap") {
-    JsonUtil.toMap(json) should equal (map)
-  }
-
-  test ("fromJson 1") {
-    JsonUtil.fromJson[JsonUtilTest.Person](json) should equal (person)
-  }
-
-  test ("fromJson 2") {
-    val is = new java.io.ByteArrayInputStream(json.getBytes("utf-8"));
-    JsonUtil.fromJson[JsonUtilTest.Person](is) should equal (person)
-  }
-
-}
-
-object JsonUtilTest {
-  case class Person(name: String, age: Int){}
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
new file mode 100644
index 0000000..fc42d43
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
@@ -0,0 +1,78 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.cache
+//
+//import java.util.Date
+//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+//
+//import org.apache.curator.framework.recipes.locks.InterProcessMutex
+//import org.apache.curator.framework.{CuratorFramework, 
CuratorFrameworkFactory}
+//import org.apache.curator.retry.ExponentialBackoffRetry
+//import org.apache.griffin.measure.cache.info.InfoCacheInstance
+//import org.apache.griffin.measure.config.params.env.InfoCacheParam
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Try}
+//
+//@RunWith(classOf[JUnitRunner])
+//class InfoCacheInstanceTest extends FunSuite with Matchers with 
BeforeAndAfter {
+//
+//  val map = Map[String, Any](
+//    ("hosts" -> "localhost:2181"),
+//    ("namespace" -> "griffin/infocache"),
+//    ("lock.path" -> "lock"),
+//    ("mode" -> "persist"),
+//    ("init.clear" -> true),
+//    ("close.clear" -> false)
+//  )
+//  val name = "ttt"
+//
+//  val icp = InfoCacheParam("zk", map)
+//  val icps = icp :: Nil
+//
+//  before {
+//    InfoCacheInstance.initInstance(icps, name)
+//    InfoCacheInstance.init
+//  }
+//
+//  test ("others") {
+//    InfoCacheInstance.available should be (true)
+//
+//    val keys = List[String](
+//      "key1", "key2"
+//    )
+//    val info = Map[String, String](
+//      ("key1" -> "value1"),
+//      ("key2" -> "value2")
+//    )
+//
+//    InfoCacheInstance.cacheInfo(info) should be (true)
+//    InfoCacheInstance.readInfo(keys) should be (info)
+//    InfoCacheInstance.deleteInfo(keys)
+////    InfoCacheInstance.readInfo(keys) should be (Map[String, String]())
+//
+//  }
+//
+//  after {
+//    InfoCacheInstance.close()
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala 
b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
new file mode 100644
index 0000000..271529c
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
@@ -0,0 +1,84 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.cache
+//
+//import java.util.Date
+//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+//
+//import org.apache.curator.framework.recipes.locks.InterProcessMutex
+//import org.apache.curator.framework.{CuratorFramework, 
CuratorFrameworkFactory}
+//import org.apache.curator.retry.ExponentialBackoffRetry
+//import org.apache.griffin.measure.cache.info.ZKInfoCache
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Try}
+//
+//@RunWith(classOf[JUnitRunner])
+//class ZKCacheLockTest extends FunSuite with Matchers with BeforeAndAfter {
+//
+//  val map = Map[String, Any](
+//    ("hosts" -> "localhost:2181"),
+//    ("namespace" -> "griffin/infocache"),
+//    ("lock.path" -> "lock"),
+//    ("mode" -> "persist"),
+//    ("init.clear" -> true),
+//    ("close.clear" -> false)
+//  )
+//  val name = "ttt"
+//
+//  val ic = ZKInfoCache(map, name)
+//
+//  before {
+//    ic.init
+//  }
+//
+//  test ("lock") {
+//
+//    case class Proc(n: Int) extends Runnable {
+//      override def run(): Unit = {
+//        val cl = ic.genLock("proc")
+//        val b = cl.lock(2, TimeUnit.SECONDS)
+//        try {
+//          println(s"${n}: ${b}")
+//          if (b) Thread.sleep(3000)
+//        } finally {
+//          cl.unlock()
+//        }
+//      }
+//    }
+//
+//    val pool = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
+//    val t = 0 until 10
+//    t.foreach(a => pool.submit(Proc(a)))
+//
+//    pool.shutdown()
+//    val t1 = new Date()
+//    println(s"${t1}: pool shut down")
+//    pool.awaitTermination(20, TimeUnit.SECONDS)
+//    val t2 = new Date()
+//    println(s"${t2}: pool shut down done [${t2.getTime - t1.getTime}]")
+//  }
+//
+//  after {
+//    ic.close()
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala 
b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
new file mode 100644
index 0000000..086170a
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
@@ -0,0 +1,90 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.cache
+//
+//import java.util.Date
+//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+//
+//import org.apache.curator.framework.recipes.locks.InterProcessMutex
+//import org.apache.curator.framework.{CuratorFramework, 
CuratorFrameworkFactory}
+//import org.apache.curator.retry.ExponentialBackoffRetry
+//import org.apache.griffin.measure.cache.info.ZKInfoCache
+//import org.junit.runner.RunWith
+//import org.scalatest.junit.JUnitRunner
+//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+//
+//import scala.util.{Failure, Try}
+//
+//@RunWith(classOf[JUnitRunner])
+//class ZKInfoCacheTest extends FunSuite with Matchers with BeforeAndAfter {
+//
+//  val map = Map[String, Any](
+//    ("hosts" -> "localhost:2181"),
+//    ("namespace" -> "griffin/infocache"),
+//    ("lock.path" -> "lock"),
+//    ("mode" -> "persist"),
+//    ("init.clear" -> true),
+//    ("close.clear" -> false)
+//  )
+//  val name = "ttt"
+//
+//  test ("available") {
+//    val ic = ZKInfoCache(map, name)
+//    ic.init
+//
+//    ic.available should be (true)
+//
+//    ic.close
+//  }
+//
+//  test ("cacheInfo and readInfo") {
+//    val ic = ZKInfoCache(map, name)
+//    ic.init
+//
+//    val keys = List[String](
+//      "key1", "key2"
+//    )
+//    val info = Map[String, String](
+//      ("key1" -> "value1"),
+//      ("key2" -> "value2")
+//    )
+//
+//    ic.cacheInfo(info) should be (true)
+//    ic.readInfo(keys) should be (info)
+//    ic.deleteInfo(keys)
+//    ic.readInfo(keys) should be (Map[String, String]())
+//
+//    ic.close
+//  }
+//
+//  test ("genLock") {
+//    val ic = ZKInfoCache(map, name)
+//    ic.init
+//
+//    val lock1 = ic.genLock("ttt")
+//    val lock2 = ic.genLock("ttt")
+//    lock1.lock(5, TimeUnit.SECONDS)
+//    lock2.lock(5, TimeUnit.SECONDS)
+//    lock1.unlock
+//    lock2.unlock
+//
+//    ic.close
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
new file mode 100644
index 0000000..b3c94e5
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.reader
+
+import org.apache.griffin.measure.config.params.env._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class ParamRawStringReaderTest extends FunSuite with Matchers with 
BeforeAndAfter {
+
+  test("read config") {
+    val rawString = """{"type": "hdfs", "config": {"path": "/path/to", "time": 
1234567}}"""
+
+    val reader = ParamRawStringReader(rawString)
+    val paramTry = reader.readConfig[PersistParam]
+    paramTry.isSuccess should be (true)
+    paramTry.get should be (PersistParam("hdfs", Map[String, Any](("path" -> 
"/path/to"), ("time" -> 1234567))))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/config/validator/AllParamValidatorTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/config/validator/AllParamValidatorTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/config/validator/AllParamValidatorTest.scala
new file mode 100644
index 0000000..8000c65
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/config/validator/AllParamValidatorTest.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.validator
+
+import org.apache.griffin.measure.config.params._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+import org.scalamock.scalatest.MockFactory
+
+@RunWith(classOf[JUnitRunner])
+class AllParamValidatorTest extends FlatSpec with Matchers with BeforeAndAfter 
with MockFactory {
+
+  "validate" should "pass" in {
+    val validator = AllParamValidator()
+    val paramMock = mock[Param]
+    paramMock.validate _ expects () returning (false)
+
+    val validateTry = validator.validate(paramMock)
+    validateTry.isSuccess should be (true)
+    validateTry.get should be (false)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
new file mode 100644
index 0000000..2139ff7
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.algo.streaming.TimingProcess
+import org.apache.griffin.measure.cache.info.InfoCacheInstance
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user.{DataCacheParam, 
DataConnectorParam, EvaluateRuleParam}
+import org.apache.griffin.measure.config.reader.ParamRawStringReader
+import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
+import org.apache.griffin.measure.rule.expr.{Expr, StatementExpr}
+import org.apache.griffin.measure.rule._
+import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
+import org.apache.griffin.measure.rule.{DataTypeCalculationUtil, 
ExprValueUtil, RuleExprs}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class ConnectorTest extends FunSuite with Matchers with BeforeAndAfter {
+
+  test("read config") {
+
+    val a = "java.lang.String"
+    val at = getClassTag(a)
+    println(at)
+
+    at match {
+      case ClassTag(m) => println(m)
+      case _ => println("no")
+    }
+
+  }
+
+  private def getClassTag(tp: String): ClassTag[_] = {
+    val clazz = Class.forName(tp)
+    ClassTag(clazz)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/persist/HdfsPersistTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/persist/HdfsPersistTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/persist/HdfsPersistTest.scala
new file mode 100644
index 0000000..2dfceb3
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/persist/HdfsPersistTest.scala
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Try, Failure}
+
+@RunWith(classOf[JUnitRunner])
+class HdfsPersistTest extends FunSuite with Matchers with BeforeAndAfter {
+
+  val config: Map[String, Any] = Map[String, Any](
+    ("path" -> "/path/to"), ("max.persist.lines" -> 100), 
("max.lines.per.file" -> 1000))
+  val metricName: String = "metric"
+  val timeStamp: Long = 123456789L
+
+  val hdfsPersist = HdfsPersist(config, metricName, timeStamp)
+
+  test ("constructor") {
+    hdfsPersist.path should be ("/path/to")
+    hdfsPersist.maxPersistLines should be (100)
+    hdfsPersist.maxLinesPerFile should be (1000)
+
+    hdfsPersist.StartFile should be 
(s"/path/to/${metricName}/${timeStamp}/_START")
+  }
+
+  test ("avaiable") {
+    hdfsPersist.available should be (true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/persist/HttpPersistTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/persist/HttpPersistTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/persist/HttpPersistTest.scala
new file mode 100644
index 0000000..1b0bc6f
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/persist/HttpPersistTest.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class HttpPersistTest extends FunSuite with Matchers with BeforeAndAfter {
+
+  val config: Map[String, Any] = Map[String, Any](("api" -> "url/api"), 
("method" -> "post"))
+  val metricName: String = "metric"
+  val timeStamp: Long = 123456789L
+
+  val httpPersist = HttpPersist(config, metricName, timeStamp)
+
+  test ("constructor") {
+    httpPersist.api should be ("url/api")
+    httpPersist.method should be ("post")
+  }
+
+  test("available") {
+    httpPersist.available should be (true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/result/AccuracyResultTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/result/AccuracyResultTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/result/AccuracyResultTest.scala
new file mode 100644
index 0000000..8a45530
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/result/AccuracyResultTest.scala
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.result
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class AccuracyResultTest extends FunSuite with BeforeAndAfter with Matchers {
+
+  test ("update") {
+    val result = AccuracyResult(10, 100)
+    val delta = AccuracyResult(3, 10)
+    result.update(delta) should be (AccuracyResult(3, 100))
+  }
+
+  test ("eventual") {
+    val result1 = AccuracyResult(10, 100)
+    result1.eventual should be (false)
+
+    val result2 = AccuracyResult(0, 100)
+    result2.eventual should be (true)
+  }
+
+  test ("differsFrom") {
+    val result = AccuracyResult(10, 100)
+    result.differsFrom(AccuracyResult(11, 100)) should be (true)
+    result.differsFrom(AccuracyResult(10, 110)) should be (true)
+    result.differsFrom(AccuracyResult(10, 100)) should be (false)
+  }
+
+  test ("matchPercentage") {
+    val result1 = AccuracyResult(10, 100)
+    result1.matchPercentage should be (90.0)
+
+    val result2 = AccuracyResult(10, 0)
+    result2.matchPercentage should be (0.0)
+  }
+
+}

Reply via email to