http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
deleted file mode 100644
index 1d5c273..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.rule.expr
-
-trait SelectExpr extends Expr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = None
-}
-
-case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends 
SelectExpr {
-  val desc: String = s"[${fields.map(_.desc).mkString(", ")}]"
-  val dataSources: Set[String] = Set.empty[String]
-}
-
-case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) 
extends SelectExpr {
-  val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})"
-  val dataSources: Set[String] = args.flatMap(_.dataSources).toSet
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getCacheExprs(ds))
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getFinalCacheExprs(ds))
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = 
args.flatMap(_.getPersistExprs(ds))
-}
-
-case class FilterSelectExpr(field: FieldDesc, compare: String, value: 
MathExpr) extends SelectExpr {
-  val desc: String = s"[${field.desc} ${compare} ${value.desc}]"
-  val dataSources: Set[String] = value.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = 
value.getCacheExprs(ds)
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = 
value.getFinalCacheExprs(ds)
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = 
value.getPersistExprs(ds)
-}
-
-// -- selection --
-case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) 
extends Expr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id)
-
-  val desc: String = {
-    val argsString = selectors.map(_.desc).mkString("")
-    s"${head.desc}${argsString}"
-  }
-  val dataSources: Set[String] = {
-    val selectorDataSources = selectors.flatMap(_.dataSources).toSet
-    selectorDataSources + head.head
-  }
-
-  override def cacheUnit: Boolean = true
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getCacheExprs(ds))
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getFinalCacheExprs(ds))
-  }
-
-  override def persistUnit: Boolean = true
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    selectors.flatMap(_.getPersistExprs(ds))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
deleted file mode 100644
index 63c8755..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.rule.expr
-
-
-trait StatementExpr extends Expr with AnalyzableExpr {
-  def valid(values: Map[String, Any]): Boolean = true
-  override def cacheUnit: Boolean = true
-}
-
-case class SimpleStatementExpr(expr: LogicalExpr) extends StatementExpr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = 
expr.calculate(values)
-  val desc: String = expr.desc
-  val dataSources: Set[String] = expr.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    expr.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    expr.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    expr.getPersistExprs(ds)
-  }
-
-  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = expr.getGroupbyExprPairs(dsPair)
-}
-
-case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) 
extends StatementExpr {
-  def calculateOnly(values: Map[String, Any]): Option[Any] = 
expr.calculate(values)
-  val desc: String = s"${expr.desc} when ${whenExpr.desc}"
-
-  override def valid(values: Map[String, Any]): Boolean = {
-    whenExpr.calculate(values) match {
-      case Some(r: Boolean) => r
-      case _ => false
-    }
-  }
-
-  val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources
-  override def getSubCacheExprs(ds: String): Iterable[Expr] = {
-    expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds)
-  }
-  override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
-    expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds)
-  }
-  override def getSubPersistExprs(ds: String): Iterable[Expr] = {
-    expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds)
-  }
-
-  override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, 
Expr)] = {
-    expr.getGroupbyExprPairs(dsPair) ++ whenExpr.getGroupbyExprPairs(dsPair)
-  }
-  override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
deleted file mode 100644
index 9b8f6a3..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.utils
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, 
FileSystem, Path}
-
-object HdfsUtil {
-
-  private val seprator = "/"
-
-  private val conf = new Configuration()
-  conf.set("dfs.support.append", "true")
-
-  private val dfs = FileSystem.get(conf)
-
-  def existPath(filePath: String): Boolean = {
-    val path = new Path(filePath)
-    dfs.exists(path)
-  }
-
-  def createFile(filePath: String): FSDataOutputStream = {
-    val path = new Path(filePath)
-    if (dfs.exists(path)) dfs.delete(path, true)
-    return dfs.create(path)
-  }
-
-  def appendOrCreateFile(filePath: String): FSDataOutputStream = {
-    val path = new Path(filePath)
-    if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
-  }
-
-  def openFile(filePath: String): FSDataInputStream = {
-    val path = new Path(filePath)
-    dfs.open(path)
-  }
-
-  def writeContent(filePath: String, message: String): Unit = {
-    val out = createFile(filePath)
-    out.write(message.getBytes("utf-8"))
-    out.close
-  }
-
-  def appendContent(filePath: String, message: String): Unit = {
-    val out = appendOrCreateFile(filePath)
-    out.write(message.getBytes("utf-8"))
-    out.close
-  }
-
-  def createEmptyFile(filePath: String): Unit = {
-    val out = createFile(filePath)
-    out.close
-  }
-
-
-  def getHdfsFilePath(parentPath: String, fileName: String): String = {
-    if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + 
seprator + fileName
-  }
-
-  def deleteHdfsPath(dirPath: String): Unit = {
-    val path = new Path(dirPath)
-    if (dfs.exists(path)) dfs.delete(path, true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
deleted file mode 100644
index 46d1b86..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.utils
-
-import scalaj.http._
-
-object HttpUtil {
-
-  val GET_REGEX = """^(?i)get$""".r
-  val POST_REGEX = """^(?i)post$""".r
-  val PUT_REGEX = """^(?i)put$""".r
-  val DELETE_REGEX = """^(?i)delete$""".r
-
-  def postData(url: String, params: Map[String, Object], headers: Map[String, 
Object], data: String): String = {
-    val response = 
Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
-    response.code.toString
-  }
-
-  def httpRequest(url: String, method: String, params: Map[String, Object], 
headers: Map[String, Object], data: String): String = {
-    val httpReq = 
Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
-    method match {
-      case POST_REGEX() => httpReq.postData(data).asString.code.toString
-      case PUT_REGEX() => httpReq.put(data).asString.code.toString
-      case _ => "wrong method"
-    }
-  }
-
-  private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, 
String] = {
-    map.map(pair => pair._1 -> pair._2.toString)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
deleted file mode 100644
index 716e422..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.utils
-
-import java.io.InputStream
-
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-
-import scala.reflect._
-
-object JsonUtil {
-  val mapper = new ObjectMapper()
-  mapper.registerModule(DefaultScalaModule)
-  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
-
-  def toJson(value: Map[Symbol, Any]): String = {
-    toJson(value map { case (k,v) => k.name -> v})
-  }
-
-  def toJson(value: Any): String = {
-    mapper.writeValueAsString(value)
-  }
-
-  def toMap[V](json:String)(implicit m: Manifest[V]) = 
fromJson[Map[String,V]](json)
-
-  def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = {
-    mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]])
-  }
-
-  def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = {
-    mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala
new file mode 100644
index 0000000..8e4b25d
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.info
+
+import org.apache.griffin.measure.cache.lock.CacheLock
+import org.apache.griffin.measure.log.Loggable
+
+trait InfoCache extends Loggable with Serializable {
+
+  def init(): Unit
+  def available(): Boolean
+  def close(): Unit
+
+  def cacheInfo(info: Map[String, String]): Boolean
+  def readInfo(keys: Iterable[String]): Map[String, String]
+  def deleteInfo(keys: Iterable[String]): Unit
+  def clearInfo(): Unit
+
+  def listKeys(path: String): List[String]
+
+  def genLock(s: String): CacheLock
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala
new file mode 100644
index 0000000..3c9d70a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.info
+
+import org.apache.griffin.measure.config.params.env.InfoCacheParam
+
+import scala.util.{Success, Try}
+
+case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], 
metricName: String) extends Serializable {
+
+  val ZK_REGEX = """^(?i)zk|zookeeper$""".r
+
+  def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = {
+    val config = infoCacheParam.config
+    val infoCacheTry = infoCacheParam.persistType match {
+      case ZK_REGEX() => Try(ZKInfoCache(config, metricName))
+      case _ => throw new Exception("not supported info cache type")
+    }
+    infoCacheTry match {
+      case Success(infoCache) => Some(infoCache)
+      case _ => None
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala
new file mode 100644
index 0000000..bcde266
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.info
+
+import org.apache.griffin.measure.cache.lock.{CacheLock, MultiCacheLock}
+import org.apache.griffin.measure.config.params.env.InfoCacheParam
+
+object InfoCacheInstance extends InfoCache {
+  var infoCaches: List[InfoCache] = Nil
+
+  def initInstance(infoCacheParams: Iterable[InfoCacheParam], metricName: 
String) = {
+    val fac = InfoCacheFactory(infoCacheParams, metricName)
+    infoCaches = infoCacheParams.flatMap(param => 
fac.getInfoCache(param)).toList
+  }
+
+  def init(): Unit = infoCaches.foreach(_.init)
+  def available(): Boolean = infoCaches.foldLeft(false)(_ || _.available)
+  def close(): Unit = infoCaches.foreach(_.close)
+
+  def cacheInfo(info: Map[String, String]): Boolean = {
+    infoCaches.foldLeft(false) { (res, infoCache) => res || 
infoCache.cacheInfo(info) }
+  }
+  def readInfo(keys: Iterable[String]): Map[String, String] = {
+    val maps = infoCaches.map(_.readInfo(keys)).reverse
+    maps.fold(Map[String, String]())(_ ++ _)
+  }
+  def deleteInfo(keys: Iterable[String]): Unit = 
infoCaches.foreach(_.deleteInfo(keys))
+  def clearInfo(): Unit = infoCaches.foreach(_.clearInfo)
+
+  def listKeys(path: String): List[String] = {
+    infoCaches.foldLeft(Nil: List[String]) { (res, infoCache) =>
+      if (res.size > 0) res else infoCache.listKeys(path)
+    }
+  }
+
+  def genLock(s: String): CacheLock = 
MultiCacheLock(infoCaches.map(_.genLock(s)))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
new file mode 100644
index 0000000..ac0acff
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
@@ -0,0 +1,116 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.info
+
+import org.apache.griffin.measure.log.Loggable
+
+object TimeInfoCache extends Loggable with Serializable {
+
+  private val CacheTime = "cache.time"
+  private val LastProcTime = "last.proc.time"
+  private val ReadyTime = "ready.time"
+  private val CleanTime = "clean.time"
+
+  def cacheTime(path: String): String = s"${path}/${CacheTime}"
+  def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
+  def readyTime(path: String): String = s"${path}/${ReadyTime}"
+  def cleanTime(path: String): String = s"${path}/${CleanTime}"
+
+  val infoPath = "info"
+
+  val finalCacheInfoPath = "info.final"
+  val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
+  val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
+  val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
+
+  def startTimeInfoCache(): Unit = {
+    genFinalReadyTime
+  }
+
+  def getTimeRange(): (Long, Long) = {
+    readTimeRange
+  }
+
+  def getCleanTime(): Long = {
+    readCleanTime
+  }
+
+  def endTimeInfoCache: Unit = {
+    genFinalLastProcTime
+    genFinalCleanTime
+  }
+
+  private def genFinalReadyTime(): Unit = {
+    val subPath = InfoCacheInstance.listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
+    val result = InfoCacheInstance.readInfo(keys)
+    val time = keys.map { k =>
+      getLong(result, k)
+    }.min
+    val map = Map[String, String]((finalReadyTime -> time.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  private def genFinalLastProcTime(): Unit = {
+    val subPath = InfoCacheInstance.listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
+    val result = InfoCacheInstance.readInfo(keys)
+    val time = keys.map { k =>
+      getLong(result, k)
+    }.min
+    val map = Map[String, String]((finalLastProcTime -> time.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  private def genFinalCleanTime(): Unit = {
+    val subPath = InfoCacheInstance.listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
+    val result = InfoCacheInstance.readInfo(keys)
+    val time = keys.map { k =>
+      getLong(result, k)
+    }.min
+    val map = Map[String, String]((finalCleanTime -> time.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  private def readTimeRange(): (Long, Long) = {
+    val map = InfoCacheInstance.readInfo(List(finalLastProcTime, 
finalReadyTime))
+    val lastProcTime = getLong(map, finalLastProcTime)
+    val curReadyTime = getLong(map, finalReadyTime)
+    (lastProcTime + 1, curReadyTime)
+  }
+
+  private def readCleanTime(): Long = {
+    val map = InfoCacheInstance.readInfo(List(finalCleanTime))
+    val cleanTime = getLong(map, finalCleanTime)
+    cleanTime
+  }
+
+  private def getLong(map: Map[String, String], key: String): Long = {
+    try {
+      map.get(key) match {
+        case Some(v) => v.toLong
+        case _ => -1
+      }
+    } catch {
+      case _ => -1
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
new file mode 100644
index 0000000..8b62fa4
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
@@ -0,0 +1,210 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.info
+
+import org.apache.curator.framework.imps.CuratorFrameworkState
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.utils.ZKPaths
+import org.apache.griffin.measure.cache.lock.ZKCacheLock
+import org.apache.zookeeper.CreateMode
+
+import scala.collection.JavaConverters._
+
+case class ZKInfoCache(config: Map[String, Any], metricName: String) extends 
InfoCache {
+
+  val Hosts = "hosts"
+  val Namespace = "namespace"
+  val Mode = "mode"
+  val InitClear = "init.clear"
+  val CloseClear = "close.clear"
+  val LockPath = "lock.path"
+
+  val PersistRegex = """^(?i)persist$""".r
+  val EphemeralRegex = """^(?i)ephemeral$""".r
+
+  final val separator = ZKPaths.PATH_SEPARATOR
+
+  val hosts = config.getOrElse(Hosts, "").toString
+  val namespace = config.getOrElse(Namespace, "").toString
+  val mode: CreateMode = config.get(Mode) match {
+    case Some(s: String) => s match {
+      case PersistRegex() => CreateMode.PERSISTENT
+      case EphemeralRegex() => CreateMode.EPHEMERAL
+      case _ => CreateMode.PERSISTENT
+    }
+    case _ => CreateMode.PERSISTENT
+  }
+  val initClear = config.get(InitClear) match {
+    case Some(b: Boolean) => b
+    case _ => true
+  }
+  val closeClear = config.get(CloseClear) match {
+    case Some(b: Boolean) => b
+    case _ => false
+  }
+  val lockPath = config.getOrElse(LockPath, "lock").toString
+
+  private val cacheNamespace: String = if (namespace.isEmpty) metricName else 
namespace + separator + metricName
+  private val builder = CuratorFrameworkFactory.builder()
+    .connectString(hosts)
+    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+    .namespace(cacheNamespace)
+  private val client: CuratorFramework = builder.build
+
+  def init(): Unit = {
+    client.start()
+    info("start zk info cache")
+    client.usingNamespace(cacheNamespace)
+    info(s"init with namespace: ${cacheNamespace}")
+    deleteInfo(lockPath :: Nil)
+    if (initClear) {
+      clearInfo
+    }
+  }
+
+  def available(): Boolean = {
+    client.getState match {
+      case CuratorFrameworkState.STARTED => true
+      case _ => false
+    }
+  }
+
+  def close(): Unit = {
+    if (closeClear) {
+      clearInfo
+    }
+    info("close zk info cache")
+    client.close()
+  }
+
+  def cacheInfo(info: Map[String, String]): Boolean = {
+    info.foldLeft(true) { (rs, pair) =>
+      val (k, v) = pair
+      createOrUpdate(path(k), v) && rs
+    }
+  }
+
+  def readInfo(keys: Iterable[String]): Map[String, String] = {
+    keys.flatMap { key =>
+      read(path(key)) match {
+        case Some(v) => Some((key, v))
+        case _ => None
+      }
+    }.toMap
+  }
+
+  def deleteInfo(keys: Iterable[String]): Unit = {
+    keys.foreach { key => delete(path(key)) }
+  }
+
+  def clearInfo(): Unit = {
+//    delete("/")
+    info("clear info")
+  }
+
+  def listKeys(p: String): List[String] = {
+    children(path(p))
+  }
+
+  def genLock(s: String): ZKCacheLock = {
+    val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
+    ZKCacheLock(new InterProcessMutex(client, lpt))
+  }
+
+  private def path(k: String): String = {
+    if (k.startsWith(separator)) k else separator + k
+  }
+
+  private def children(path: String): List[String] = {
+    try {
+      client.getChildren().forPath(path).asScala.toList
+    } catch {
+      case e: Throwable => {
+        error(s"list ${path} error: ${e.getMessage}")
+        Nil
+      }
+    }
+  }
+
+  private def createOrUpdate(path: String, content: String): Boolean = {
+    if (checkExists(path)) {
+      update(path, content)
+    } else {
+      create(path, content)
+    }
+  }
+
+  private def create(path: String, content: String): Boolean = {
+    try {
+      client.create().creatingParentsIfNeeded().withMode(mode)
+        .forPath(path, content.getBytes("utf-8"))
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+  private def update(path: String, content: String): Boolean = {
+    try {
+      client.setData().forPath(path, content.getBytes("utf-8"))
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+  private def read(path: String): Option[String] = {
+    try {
+      Some(new String(client.getData().forPath(path), "utf-8"))
+    } catch {
+      case e: Throwable => {
+        error(s"read ${path} error: ${e.getMessage}")
+        None
+      }
+    }
+  }
+
+  private def delete(path: String): Unit = {
+    try {
+      client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
+    } catch {
+      case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
+    }
+  }
+
+  private def checkExists(path: String): Boolean = {
+    try {
+      client.checkExists().forPath(path) != null
+    } catch {
+      case e: Throwable => {
+        error(s"check exists ${path} error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala
new file mode 100644
index 0000000..24f142c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.log.Loggable
+
+trait CacheLock extends Loggable with Serializable {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean
+
+  def unlock(): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala
new file mode 100644
index 0000000..7b835f4
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.lock
+
+import java.util.concurrent.TimeUnit
+
+case class MultiCacheLock(cacheLocks: List[CacheLock]) extends CacheLock {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean = {
+    cacheLocks.headOption match {
+      case Some(cl) => cl.lock(outtime, unit)
+      case None => true
+    }
+  }
+
+  def unlock(): Unit = {
+    cacheLocks.headOption match {
+      case Some(cl) => cl.unlock
+      case None => {}
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala
new file mode 100644
index 0000000..77ee83b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+
+case class ZKCacheLock(@transient mutex: InterProcessMutex) extends CacheLock {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean = {
+    try {
+      if (outtime >= 0) {
+        mutex.acquire(outtime, unit)
+      } else {
+        mutex.acquire(-1, null)
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"lock error: ${e.getMessage}")
+        false
+      }
+    }
+
+  }
+
+  def unlock(): Unit = {
+    try {
+      if (mutex.isAcquiredInThisProcess) mutex.release
+    } catch {
+      case e: Throwable => {
+        error(s"unlock error: ${e.getMessage}")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala
new file mode 100644
index 0000000..92787be
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala
@@ -0,0 +1,29 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.result
+
+import org.apache.griffin.measure.result.Result
+
+case class CacheResult(timeGroup: Long, updateTime: Long, result: Result) {
+
+  def olderThan(ut: Long): Boolean = {
+    updateTime < ut
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
new file mode 100644
index 0000000..50d3ada
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.result
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.result._
+
+import scala.collection.mutable.{Map => MutableMap}
+
+case class CacheResultProcesser() extends Loggable {
+
+  val cacheGroup: MutableMap[Long, CacheResult] = MutableMap()
+
+  def genUpdateCacheResult(timeGroup: Long, updateTime: Long, result: Result): 
Option[CacheResult] = {
+    cacheGroup.get(timeGroup) match {
+      case Some(cr) => {
+        if (cr.olderThan(updateTime)) {
+          val existResult = cr.result
+          val newResult = 
existResult.update(result.asInstanceOf[existResult.T])
+          if (existResult.differsFrom(newResult)) {
+            Some(CacheResult(timeGroup, updateTime, newResult))
+          } else None
+        } else None
+      }
+      case _ => {
+        Some(CacheResult(timeGroup, updateTime, result))
+      }
+    }
+  }
+
+  def update(cr: CacheResult): Unit = {
+    val t = cr.timeGroup
+    cacheGroup.get(t) match {
+      case Some(c) => {
+        if (c.olderThan(cr.updateTime)) cacheGroup += (t -> cr)
+      }
+      case _ => cacheGroup += (t -> cr)
+    }
+  }
+
+  def getCacheResult(timeGroup: Long): Option[CacheResult] = {
+    cacheGroup.get(timeGroup)
+  }
+
+  def refresh(overtime: Long): Unit = {
+    val curCacheGroup = cacheGroup.toMap
+    val deadCache = curCacheGroup.filter { pr =>
+      val (_, cr) = pr
+      cr.timeGroup < overtime || cr.result.eventual()
+    }
+    info(s"=== dead cache group count: ${deadCache.size} ===")
+    deadCache.keySet.foreach(cacheGroup -= _)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala
new file mode 100644
index 0000000..8990564
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+
+// simply composite of env and user params, for convenient usage
+@JsonInclude(Include.NON_NULL)
+case class AllParam( @JsonProperty("env") envParam: EnvParam,
+                     @JsonProperty("user") userParam: UserParam
+                   ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala
new file mode 100644
index 0000000..d3484a1
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params
+
+trait Param extends Serializable {
+
+  def validate(): Boolean = true
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala
new file mode 100644
index 0000000..00302a7
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala
@@ -0,0 +1,29 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String
+                       ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala
new file mode 100644
index 0000000..ad87a5f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam,
+                     @JsonProperty("persist") persistParams: 
List[PersistParam],
+                     @JsonProperty("info.cache") infoCacheParams: 
List[InfoCacheParam],
+                     @JsonProperty("cleaner") cleanerParam: CleanerParam
+                   ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala
new file mode 100644
index 0000000..be588f9
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class InfoCacheParam( @JsonProperty("type") persistType: String,
+                           @JsonProperty("config") config: Map[String, Any]
+                         ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala
new file mode 100644
index 0000000..68b9bc8
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class PersistParam( @JsonProperty("type") persistType: String,
+                         @JsonProperty("config") config: Map[String, Any]
+                       ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
new file mode 100644
index 0000000..6ec0955
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class SparkParam( @JsonProperty("log.level") logLevel: String,
+                       @JsonProperty("checkpoint.dir") cpDir: String,
+                       @JsonProperty("batch.interval") batchInterval: String,
+                       @JsonProperty("process.interval") processInterval: 
String,
+                       @JsonProperty("config") config: Map[String, String]
+                     ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
new file mode 100644
index 0000000..9c60755
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class DataCacheParam( @JsonProperty("type") cacheType: String,
+                           @JsonProperty("config") config: Map[String, Any],
+                           @JsonProperty("time.range") timeRange: List[String]
+                         ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
new file mode 100644
index 0000000..dbc2e0b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class DataConnectorParam( @JsonProperty("type") conType: String,
+                               @JsonProperty("version") version: String,
+                               @JsonProperty("config") config: Map[String, 
Any],
+                               @JsonProperty("cache") cache: DataCacheParam,
+                               @JsonProperty("match.once") matchOnce: Boolean
+                             ) extends Param {
+
+  def getMatchOnce(): Boolean = {
+    if (matchOnce == null) false else matchOnce
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
new file mode 100644
index 0000000..6ee9783
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double,
+                              @JsonProperty("rules") rules: String
+                            ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
new file mode 100644
index 0000000..df0647c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class UserParam(@JsonProperty("name") name: String,
+                     @JsonProperty("type") dqType: String,
+                     @JsonProperty("process.type") procType: String,
+                     @JsonProperty("source") sourceParam: DataConnectorParam,
+                     @JsonProperty("target") targetParam: DataConnectorParam,
+                     @JsonProperty("evaluateRule") evaluateRuleParam: 
EvaluateRuleParam
+                    ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala
new file mode 100644
index 0000000..ede68f4
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.reader
+
+import org.apache.griffin.measure.config.params.Param
+import org.apache.griffin.measure.utils.JsonUtil
+
+import scala.util.Try
+
+case class ParamFileReader(file: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val source = scala.io.Source.fromFile(file)
+      val lines = source.mkString
+      val param = JsonUtil.fromJson[T](lines)
+      source.close
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala
new file mode 100644
index 0000000..8b51b11
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.reader
+
+import org.apache.griffin.measure.config.params.Param
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.griffin.measure.utils.HdfsUtil
+
+import scala.util.Try
+
+case class ParamHdfsFileReader(filePath: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val source = HdfsUtil.openFile(filePath)
+      val param = JsonUtil.fromJson[T](source)
+      source.close
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala
new file mode 100644
index 0000000..87e1953
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.reader
+
+import org.apache.griffin.measure.config.params.Param
+import org.apache.griffin.measure.utils.JsonUtil
+
+import scala.util.Try
+
+case class ParamRawStringReader(rawString: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val param = JsonUtil.fromJson[T](rawString)
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala
new file mode 100644
index 0000000..3508223
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.reader
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.config.params.Param
+
+import scala.util.Try
+
+trait ParamReader extends Loggable with Serializable {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala
new file mode 100644
index 0000000..9299247
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.reader
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+
+object ParamReaderFactory {
+
+  val RawStringRegex = """^(?i)raw$""".r
+  val LocalFsRegex = """^(?i)local$""".r
+  val HdfsFsRegex = """^(?i)hdfs$""".r
+
+  def getParamReader(filePath: String, fsType: String): ParamReader = {
+    fsType match {
+      case RawStringRegex() => ParamRawStringReader(filePath)
+      case LocalFsRegex() => ParamFileReader(filePath)
+      case HdfsFsRegex() => ParamHdfsFileReader(filePath)
+      case _ => ParamHdfsFileReader(filePath)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala
new file mode 100644
index 0000000..66e140b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.validator
+
+import org.apache.griffin.measure.config.params.Param
+
+import scala.util.Try
+
+// need to validate params
+case class AllParamValidator() extends ParamValidator {
+
+  def validate[T <: Param](param: Param): Try[Boolean] = {
+    Try {
+      param.validate
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
new file mode 100644
index 0000000..1a3e050
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.validator
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.config.params.Param
+
+import scala.util.Try
+
+trait ParamValidator extends Loggable with Serializable {
+
+  def validate[T <: Param](param: Param): Try[Boolean]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
new file mode 100644
index 0000000..1fb1868
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+
+trait DataConnector extends Loggable with Serializable {
+
+  def available(): Boolean
+
+  def init(): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..670175d
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
@@ -0,0 +1,139 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.connector.cache._
+import org.apache.griffin.measure.connector.direct._
+import org.apache.griffin.measure.connector.streaming._
+import org.apache.griffin.measure.rule.RuleExprs
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+object DataConnectorFactory {
+
+  val HiveRegex = """^(?i)hive$""".r
+  val AvroRegex = """^(?i)avro$""".r
+
+  val KafkaRegex = """^(?i)kafka$""".r
+
+  val TextRegex = """^(?i)text$""".r
+
+  def getDirectDataConnector(sqlContext: SQLContext,
+                             ssc: StreamingContext,
+                             dataConnectorParam: DataConnectorParam,
+                             ruleExprs: RuleExprs,
+                             globalFinalCacheMap: Map[String, Any]
+                            ): Try[DirectDataConnector] = {
+    val conType = dataConnectorParam.conType
+    val version = dataConnectorParam.version
+    val config = dataConnectorParam.config
+    Try {
+      conType match {
+        case HiveRegex() => HiveDirectDataConnector(sqlContext, config, 
ruleExprs, globalFinalCacheMap)
+        case AvroRegex() => AvroDirectDataConnector(sqlContext, config, 
ruleExprs, globalFinalCacheMap)
+        case KafkaRegex() => {
+          val ksdcTry = getStreamingDataConnector(ssc, dataConnectorParam)
+          val cdcTry = getCacheDataConnector(sqlContext, 
dataConnectorParam.cache)
+          KafkaCacheDirectDataConnector(ksdcTry, cdcTry, dataConnectorParam, 
ruleExprs, globalFinalCacheMap)
+        }
+        case _ => throw new Exception("connector creation error!")
+      }
+    }
+  }
+
+  private def getStreamingDataConnector(ssc: StreamingContext,
+                                        dataConnectorParam: DataConnectorParam
+                                       ): Try[StreamingDataConnector] = {
+    val conType = dataConnectorParam.conType
+    val version = dataConnectorParam.version
+    val config = dataConnectorParam.config
+    Try {
+      conType match {
+        case KafkaRegex() => {
+          genKafkaDataConnector(ssc, config)
+        }
+        case _ => throw new Exception("streaming connector creation error!")
+      }
+    }
+  }
+
+  private def getCacheDataConnector(sqlContext: SQLContext,
+                                    dataCacheParam: DataCacheParam
+                                   ): Try[CacheDataConnector] = {
+    if (dataCacheParam == null) {
+      throw new Exception("invalid data cache param!")
+    }
+    val cacheType = dataCacheParam.cacheType
+    Try {
+      cacheType match {
+        case HiveRegex() => HiveCacheDataConnector(sqlContext, dataCacheParam)
+        case TextRegex() => TextCacheDataConnector(sqlContext, dataCacheParam)
+        case _ => throw new Exception("cache connector creation error!")
+      }
+    }
+  }
+
+  private def genKafkaDataConnector(ssc: StreamingContext, config: Map[String, 
Any]) = {
+    val KeyType = "key.type"
+    val ValueType = "value.type"
+    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
+    val valueType = config.getOrElse(ValueType, "java.lang.String").toString
+//    val KafkaConfig = "kafka.config"
+//    val Topics = "topics"
+//    val kafkaConfig = config.get(KafkaConfig) match {
+//      case Some(map: Map[String, Any]) => 
map.mapValues(_.toString).map(identity)
+//      case _ => Map[String, String]()
+//    }
+//    val topics = config.getOrElse(Topics, "").toString
+    (getClassTag(keyType), getClassTag(valueType)) match {
+      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
+        if (ssc == null) throw new Exception("streaming context is null!  ")
+        new KafkaStreamingDataConnector(ssc, config) {
+          type K = String
+          type KD = StringDecoder
+          type V = String
+          type VD = StringDecoder
+          def createDStream(topicSet: Set[String]): InputDStream[(K, V)] = {
+            KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, 
topicSet)
+          }
+        }
+      }
+      case _ => {
+        throw new Exception("not supported type kafka data connector")
+      }
+    }
+  }
+
+  private def getClassTag(tp: String): ClassTag[_] = {
+    try {
+      val clazz = Class.forName(tp)
+      ClassTag(clazz)
+    } catch {
+      case e: Throwable => throw e
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
new file mode 100644
index 0000000..1dfe8e2
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.cache
+
+import org.apache.griffin.measure.connector.DataConnector
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+trait CacheDataConnector extends DataConnector with DataCacheable with 
DataUpdatable {
+
+  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit
+
+  def readData(): Try[RDD[Map[String, Any]]]
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
new file mode 100644
index 0000000..2be87a6
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
@@ -0,0 +1,86 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.cache
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+
+trait DataCacheable {
+
+  protected val defCacheInfoPath = PathCounter.genPath
+
+  val cacheInfoPath: String
+  val readyTimeInterval: Long
+  val readyTimeDelay: Long
+
+  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
+
+  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
+  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
+  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
+  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
+
+  protected def submitCacheTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfCacheTime -> ms.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def submitReadyTime(ms: Long): Unit = {
+    val curReadyTime = ms - readyTimeDelay
+    if (curReadyTime % readyTimeInterval == 0) {
+      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
+  }
+
+  protected def submitLastProcTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfLastProcTime -> ms.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def submitCleanTime(ms: Long): Unit = {
+    val cleanTime = genCleanTime(ms)
+    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def genCleanTime(ms: Long): Long = ms
+
+  protected def readCleanTime(): Option[Long] = {
+    val key = selfCleanTime
+    val keys = key :: Nil
+    InfoCacheInstance.readInfo(keys).get(key).flatMap { v =>
+      try {
+        Some(v.toLong)
+      } catch {
+        case _ => None
+      }
+    }
+  }
+
+}
+
+object PathCounter {
+  private val counter: AtomicLong = new AtomicLong(0L)
+  def genPath(): String = s"path_${increment}"
+  private def increment(): Long = {
+    counter.incrementAndGet()
+  }
+}
\ No newline at end of file


Reply via email to