http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala deleted file mode 100644 index 1d5c273..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.batch.rule.expr - -trait SelectExpr extends Expr { - def calculateOnly(values: Map[String, Any]): Option[Any] = None -} - -case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr { - val desc: String = s"[${fields.map(_.desc).mkString(", ")}]" - val dataSources: Set[String] = Set.empty[String] -} - -case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr { - val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})" - val dataSources: Set[String] = args.flatMap(_.dataSources).toSet - override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds)) - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds)) - override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds)) -} - -case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr { - val desc: String = s"[${field.desc} ${compare} ${value.desc}]" - val dataSources: Set[String] = value.dataSources - override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds) - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds) - override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds) -} - -// -- selection -- -case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr { - def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id) - - val desc: String = { - val argsString = selectors.map(_.desc).mkString("") - s"${head.desc}${argsString}" - } - val dataSources: Set[String] = { - val selectorDataSources = selectors.flatMap(_.dataSources).toSet - selectorDataSources + head.head - } - - override def cacheUnit: Boolean = true - override def getSubCacheExprs(ds: String): Iterable[Expr] = { - selectors.flatMap(_.getCacheExprs(ds)) - } - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { - selectors.flatMap(_.getFinalCacheExprs(ds)) - } - - override def persistUnit: Boolean = true - override def getSubPersistExprs(ds: String): Iterable[Expr] = { - selectors.flatMap(_.getPersistExprs(ds)) - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala deleted file mode 100644 index 63c8755..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.batch.rule.expr - - -trait StatementExpr extends Expr with AnalyzableExpr { - def valid(values: Map[String, Any]): Boolean = true - override def cacheUnit: Boolean = true -} - -case class SimpleStatementExpr(expr: LogicalExpr) extends StatementExpr { - def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) - val desc: String = expr.desc - val dataSources: Set[String] = expr.dataSources - override def getSubCacheExprs(ds: String): Iterable[Expr] = { - expr.getCacheExprs(ds) - } - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { - expr.getFinalCacheExprs(ds) - } - override def getSubPersistExprs(ds: String): Iterable[Expr] = { - expr.getPersistExprs(ds) - } - - override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = expr.getGroupbyExprPairs(dsPair) -} - -case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) extends StatementExpr { - def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) - val desc: String = s"${expr.desc} when ${whenExpr.desc}" - - override def valid(values: Map[String, Any]): Boolean = { - whenExpr.calculate(values) match { - case Some(r: Boolean) => r - case _ => false - } - } - - val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources - override def getSubCacheExprs(ds: String): Iterable[Expr] = { - expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds) - } - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { - expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds) - } - override def getSubPersistExprs(ds: String): Iterable[Expr] = { - expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds) - } - - override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = { - expr.getGroupbyExprPairs(dsPair) ++ whenExpr.getGroupbyExprPairs(dsPair) - } - override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala deleted file mode 100644 index 9b8f6a3..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.batch.utils - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} - -object HdfsUtil { - - private val seprator = "/" - - private val conf = new Configuration() - conf.set("dfs.support.append", "true") - - private val dfs = FileSystem.get(conf) - - def existPath(filePath: String): Boolean = { - val path = new Path(filePath) - dfs.exists(path) - } - - def createFile(filePath: String): FSDataOutputStream = { - val path = new Path(filePath) - if (dfs.exists(path)) dfs.delete(path, true) - return dfs.create(path) - } - - def appendOrCreateFile(filePath: String): FSDataOutputStream = { - val path = new Path(filePath) - if (dfs.exists(path)) dfs.append(path) else createFile(filePath) - } - - def openFile(filePath: String): FSDataInputStream = { - val path = new Path(filePath) - dfs.open(path) - } - - def writeContent(filePath: String, message: String): Unit = { - val out = createFile(filePath) - out.write(message.getBytes("utf-8")) - out.close - } - - def appendContent(filePath: String, message: String): Unit = { - val out = appendOrCreateFile(filePath) - out.write(message.getBytes("utf-8")) - out.close - } - - def createEmptyFile(filePath: String): Unit = { - val out = createFile(filePath) - out.close - } - - - def getHdfsFilePath(parentPath: String, fileName: String): String = { - if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName - } - - def deleteHdfsPath(dirPath: String): Unit = { - val path = new Path(dirPath) - if (dfs.exists(path)) dfs.delete(path, true) - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala deleted file mode 100644 index 46d1b86..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.batch.utils - -import scalaj.http._ - -object HttpUtil { - - val GET_REGEX = """^(?i)get$""".r - val POST_REGEX = """^(?i)post$""".r - val PUT_REGEX = """^(?i)put$""".r - val DELETE_REGEX = """^(?i)delete$""".r - - def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = { - val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString - response.code.toString - } - - def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = { - val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)) - method match { - case POST_REGEX() => httpReq.postData(data).asString.code.toString - case PUT_REGEX() => httpReq.put(data).asString.code.toString - case _ => "wrong method" - } - } - - private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = { - map.map(pair => pair._1 -> pair._2.toString) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala deleted file mode 100644 index 716e422..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.batch.utils - -import java.io.InputStream - -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} -import com.fasterxml.jackson.module.scala.DefaultScalaModule - -import scala.reflect._ - -object JsonUtil { - val mapper = new ObjectMapper() - mapper.registerModule(DefaultScalaModule) - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - - def toJson(value: Map[Symbol, Any]): String = { - toJson(value map { case (k,v) => k.name -> v}) - } - - def toJson(value: Any): String = { - mapper.writeValueAsString(value) - } - - def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json) - - def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = { - mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) - } - - def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = { - mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala new file mode 100644 index 0000000..8e4b25d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCache.scala @@ -0,0 +1,39 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.info + +import org.apache.griffin.measure.cache.lock.CacheLock +import org.apache.griffin.measure.log.Loggable + +trait InfoCache extends Loggable with Serializable { + + def init(): Unit + def available(): Boolean + def close(): Unit + + def cacheInfo(info: Map[String, String]): Boolean + def readInfo(keys: Iterable[String]): Map[String, String] + def deleteInfo(keys: Iterable[String]): Unit + def clearInfo(): Unit + + def listKeys(path: String): List[String] + + def genLock(s: String): CacheLock + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala new file mode 100644 index 0000000..3c9d70a --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheFactory.scala @@ -0,0 +1,41 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.info + +import org.apache.griffin.measure.config.params.env.InfoCacheParam + +import scala.util.{Success, Try} + +case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], metricName: String) extends Serializable { + + val ZK_REGEX = """^(?i)zk|zookeeper$""".r + + def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = { + val config = infoCacheParam.config + val infoCacheTry = infoCacheParam.persistType match { + case ZK_REGEX() => Try(ZKInfoCache(config, metricName)) + case _ => throw new Exception("not supported info cache type") + } + infoCacheTry match { + case Success(infoCache) => Some(infoCache) + case _ => None + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala new file mode 100644 index 0000000..bcde266 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/InfoCacheInstance.scala @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.info + +import org.apache.griffin.measure.cache.lock.{CacheLock, MultiCacheLock} +import org.apache.griffin.measure.config.params.env.InfoCacheParam + +object InfoCacheInstance extends InfoCache { + var infoCaches: List[InfoCache] = Nil + + def initInstance(infoCacheParams: Iterable[InfoCacheParam], metricName: String) = { + val fac = InfoCacheFactory(infoCacheParams, metricName) + infoCaches = infoCacheParams.flatMap(param => fac.getInfoCache(param)).toList + } + + def init(): Unit = infoCaches.foreach(_.init) + def available(): Boolean = infoCaches.foldLeft(false)(_ || _.available) + def close(): Unit = infoCaches.foreach(_.close) + + def cacheInfo(info: Map[String, String]): Boolean = { + infoCaches.foldLeft(false) { (res, infoCache) => res || infoCache.cacheInfo(info) } + } + def readInfo(keys: Iterable[String]): Map[String, String] = { + val maps = infoCaches.map(_.readInfo(keys)).reverse + maps.fold(Map[String, String]())(_ ++ _) + } + def deleteInfo(keys: Iterable[String]): Unit = infoCaches.foreach(_.deleteInfo(keys)) + def clearInfo(): Unit = infoCaches.foreach(_.clearInfo) + + def listKeys(path: String): List[String] = { + infoCaches.foldLeft(Nil: List[String]) { (res, infoCache) => + if (res.size > 0) res else infoCache.listKeys(path) + } + } + + def genLock(s: String): CacheLock = MultiCacheLock(infoCaches.map(_.genLock(s))) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala new file mode 100644 index 0000000..ac0acff --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala @@ -0,0 +1,116 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.info + +import org.apache.griffin.measure.log.Loggable + +object TimeInfoCache extends Loggable with Serializable { + + private val CacheTime = "cache.time" + private val LastProcTime = "last.proc.time" + private val ReadyTime = "ready.time" + private val CleanTime = "clean.time" + + def cacheTime(path: String): String = s"${path}/${CacheTime}" + def lastProcTime(path: String): String = s"${path}/${LastProcTime}" + def readyTime(path: String): String = s"${path}/${ReadyTime}" + def cleanTime(path: String): String = s"${path}/${CleanTime}" + + val infoPath = "info" + + val finalCacheInfoPath = "info.final" + val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}" + val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}" + val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}" + + def startTimeInfoCache(): Unit = { + genFinalReadyTime + } + + def getTimeRange(): (Long, Long) = { + readTimeRange + } + + def getCleanTime(): Long = { + readCleanTime + } + + def endTimeInfoCache: Unit = { + genFinalLastProcTime + genFinalCleanTime + } + + private def genFinalReadyTime(): Unit = { + val subPath = InfoCacheInstance.listKeys(infoPath) + val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" } + val result = InfoCacheInstance.readInfo(keys) + val time = keys.map { k => + getLong(result, k) + }.min + val map = Map[String, String]((finalReadyTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } + + private def genFinalLastProcTime(): Unit = { + val subPath = InfoCacheInstance.listKeys(infoPath) + val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" } + val result = InfoCacheInstance.readInfo(keys) + val time = keys.map { k => + getLong(result, k) + }.min + val map = Map[String, String]((finalLastProcTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } + + private def genFinalCleanTime(): Unit = { + val subPath = InfoCacheInstance.listKeys(infoPath) + val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" } + val result = InfoCacheInstance.readInfo(keys) + val time = keys.map { k => + getLong(result, k) + }.min + val map = Map[String, String]((finalCleanTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } + + private def readTimeRange(): (Long, Long) = { + val map = InfoCacheInstance.readInfo(List(finalLastProcTime, finalReadyTime)) + val lastProcTime = getLong(map, finalLastProcTime) + val curReadyTime = getLong(map, finalReadyTime) + (lastProcTime + 1, curReadyTime) + } + + private def readCleanTime(): Long = { + val map = InfoCacheInstance.readInfo(List(finalCleanTime)) + val cleanTime = getLong(map, finalCleanTime) + cleanTime + } + + private def getLong(map: Map[String, String], key: String): Long = { + try { + map.get(key) match { + case Some(v) => v.toLong + case _ => -1 + } + } catch { + case _ => -1 + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala new file mode 100644 index 0000000..8b62fa4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala @@ -0,0 +1,210 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.info + +import org.apache.curator.framework.imps.CuratorFrameworkState +import org.apache.curator.framework.recipes.locks.InterProcessMutex +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry.ExponentialBackoffRetry +import org.apache.curator.utils.ZKPaths +import org.apache.griffin.measure.cache.lock.ZKCacheLock +import org.apache.zookeeper.CreateMode + +import scala.collection.JavaConverters._ + +case class ZKInfoCache(config: Map[String, Any], metricName: String) extends InfoCache { + + val Hosts = "hosts" + val Namespace = "namespace" + val Mode = "mode" + val InitClear = "init.clear" + val CloseClear = "close.clear" + val LockPath = "lock.path" + + val PersistRegex = """^(?i)persist$""".r + val EphemeralRegex = """^(?i)ephemeral$""".r + + final val separator = ZKPaths.PATH_SEPARATOR + + val hosts = config.getOrElse(Hosts, "").toString + val namespace = config.getOrElse(Namespace, "").toString + val mode: CreateMode = config.get(Mode) match { + case Some(s: String) => s match { + case PersistRegex() => CreateMode.PERSISTENT + case EphemeralRegex() => CreateMode.EPHEMERAL + case _ => CreateMode.PERSISTENT + } + case _ => CreateMode.PERSISTENT + } + val initClear = config.get(InitClear) match { + case Some(b: Boolean) => b + case _ => true + } + val closeClear = config.get(CloseClear) match { + case Some(b: Boolean) => b + case _ => false + } + val lockPath = config.getOrElse(LockPath, "lock").toString + + private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName + private val builder = CuratorFrameworkFactory.builder() + .connectString(hosts) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .namespace(cacheNamespace) + private val client: CuratorFramework = builder.build + + def init(): Unit = { + client.start() + info("start zk info cache") + client.usingNamespace(cacheNamespace) + info(s"init with namespace: ${cacheNamespace}") + deleteInfo(lockPath :: Nil) + if (initClear) { + clearInfo + } + } + + def available(): Boolean = { + client.getState match { + case CuratorFrameworkState.STARTED => true + case _ => false + } + } + + def close(): Unit = { + if (closeClear) { + clearInfo + } + info("close zk info cache") + client.close() + } + + def cacheInfo(info: Map[String, String]): Boolean = { + info.foldLeft(true) { (rs, pair) => + val (k, v) = pair + createOrUpdate(path(k), v) && rs + } + } + + def readInfo(keys: Iterable[String]): Map[String, String] = { + keys.flatMap { key => + read(path(key)) match { + case Some(v) => Some((key, v)) + case _ => None + } + }.toMap + } + + def deleteInfo(keys: Iterable[String]): Unit = { + keys.foreach { key => delete(path(key)) } + } + + def clearInfo(): Unit = { +// delete("/") + info("clear info") + } + + def listKeys(p: String): List[String] = { + children(path(p)) + } + + def genLock(s: String): ZKCacheLock = { + val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s + ZKCacheLock(new InterProcessMutex(client, lpt)) + } + + private def path(k: String): String = { + if (k.startsWith(separator)) k else separator + k + } + + private def children(path: String): List[String] = { + try { + client.getChildren().forPath(path).asScala.toList + } catch { + case e: Throwable => { + error(s"list ${path} error: ${e.getMessage}") + Nil + } + } + } + + private def createOrUpdate(path: String, content: String): Boolean = { + if (checkExists(path)) { + update(path, content) + } else { + create(path, content) + } + } + + private def create(path: String, content: String): Boolean = { + try { + client.create().creatingParentsIfNeeded().withMode(mode) + .forPath(path, content.getBytes("utf-8")) + true + } catch { + case e: Throwable => { + error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}") + false + } + } + } + + private def update(path: String, content: String): Boolean = { + try { + client.setData().forPath(path, content.getBytes("utf-8")) + true + } catch { + case e: Throwable => { + error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}") + false + } + } + } + + private def read(path: String): Option[String] = { + try { + Some(new String(client.getData().forPath(path), "utf-8")) + } catch { + case e: Throwable => { + error(s"read ${path} error: ${e.getMessage}") + None + } + } + } + + private def delete(path: String): Unit = { + try { + client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path) + } catch { + case e: Throwable => error(s"delete ${path} error: ${e.getMessage}") + } + } + + private def checkExists(path: String): Boolean = { + try { + client.checkExists().forPath(path) != null + } catch { + case e: Throwable => { + error(s"check exists ${path} error: ${e.getMessage}") + false + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala new file mode 100644 index 0000000..24f142c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/CacheLock.scala @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.lock + +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.log.Loggable + +trait CacheLock extends Loggable with Serializable { + + def lock(outtime: Long, unit: TimeUnit): Boolean + + def unlock(): Unit + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala new file mode 100644 index 0000000..7b835f4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/MultiCacheLock.scala @@ -0,0 +1,39 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.lock + +import java.util.concurrent.TimeUnit + +case class MultiCacheLock(cacheLocks: List[CacheLock]) extends CacheLock { + + def lock(outtime: Long, unit: TimeUnit): Boolean = { + cacheLocks.headOption match { + case Some(cl) => cl.lock(outtime, unit) + case None => true + } + } + + def unlock(): Unit = { + cacheLocks.headOption match { + case Some(cl) => cl.unlock + case None => {} + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala new file mode 100644 index 0000000..77ee83b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/lock/ZKCacheLock.scala @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.lock + +import java.util.concurrent.TimeUnit + +import org.apache.curator.framework.recipes.locks.InterProcessMutex + +case class ZKCacheLock(@transient mutex: InterProcessMutex) extends CacheLock { + + def lock(outtime: Long, unit: TimeUnit): Boolean = { + try { + if (outtime >= 0) { + mutex.acquire(outtime, unit) + } else { + mutex.acquire(-1, null) + } + } catch { + case e: Throwable => { + error(s"lock error: ${e.getMessage}") + false + } + } + + } + + def unlock(): Unit = { + try { + if (mutex.isAcquiredInThisProcess) mutex.release + } catch { + case e: Throwable => { + error(s"unlock error: ${e.getMessage}") + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala new file mode 100644 index 0000000..92787be --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResult.scala @@ -0,0 +1,29 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.result + +import org.apache.griffin.measure.result.Result + +case class CacheResult(timeGroup: Long, updateTime: Long, result: Result) { + + def olderThan(ut: Long): Boolean = { + updateTime < ut + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala new file mode 100644 index 0000000..50d3ada --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala @@ -0,0 +1,71 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.cache.result + +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.result._ + +import scala.collection.mutable.{Map => MutableMap} + +case class CacheResultProcesser() extends Loggable { + + val cacheGroup: MutableMap[Long, CacheResult] = MutableMap() + + def genUpdateCacheResult(timeGroup: Long, updateTime: Long, result: Result): Option[CacheResult] = { + cacheGroup.get(timeGroup) match { + case Some(cr) => { + if (cr.olderThan(updateTime)) { + val existResult = cr.result + val newResult = existResult.update(result.asInstanceOf[existResult.T]) + if (existResult.differsFrom(newResult)) { + Some(CacheResult(timeGroup, updateTime, newResult)) + } else None + } else None + } + case _ => { + Some(CacheResult(timeGroup, updateTime, result)) + } + } + } + + def update(cr: CacheResult): Unit = { + val t = cr.timeGroup + cacheGroup.get(t) match { + case Some(c) => { + if (c.olderThan(cr.updateTime)) cacheGroup += (t -> cr) + } + case _ => cacheGroup += (t -> cr) + } + } + + def getCacheResult(timeGroup: Long): Option[CacheResult] = { + cacheGroup.get(timeGroup) + } + + def refresh(overtime: Long): Unit = { + val curCacheGroup = cacheGroup.toMap + val deadCache = curCacheGroup.filter { pr => + val (_, cr) = pr + cr.timeGroup < overtime || cr.result.eventual() + } + info(s"=== dead cache group count: ${deadCache.size} ===") + deadCache.keySet.foreach(cacheGroup -= _) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala new file mode 100644 index 0000000..8990564 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/AllParam.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.env._ +import org.apache.griffin.measure.config.params.user._ + +// simply composite of env and user params, for convenient usage +@JsonInclude(Include.NON_NULL) +case class AllParam( @JsonProperty("env") envParam: EnvParam, + @JsonProperty("user") userParam: UserParam + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala new file mode 100644 index 0000000..d3484a1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/Param.scala @@ -0,0 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params + +trait Param extends Serializable { + + def validate(): Boolean = true + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala new file mode 100644 index 0000000..00302a7 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala @@ -0,0 +1,29 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala new file mode 100644 index 0000000..ad87a5f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam, + @JsonProperty("persist") persistParams: List[PersistParam], + @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam], + @JsonProperty("cleaner") cleanerParam: CleanerParam + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala new file mode 100644 index 0000000..be588f9 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala @@ -0,0 +1,30 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class InfoCacheParam( @JsonProperty("type") persistType: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala new file mode 100644 index 0000000..68b9bc8 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala @@ -0,0 +1,30 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class PersistParam( @JsonProperty("type") persistType: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala new file mode 100644 index 0000000..6ec0955 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala @@ -0,0 +1,33 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class SparkParam( @JsonProperty("log.level") logLevel: String, + @JsonProperty("checkpoint.dir") cpDir: String, + @JsonProperty("batch.interval") batchInterval: String, + @JsonProperty("process.interval") processInterval: String, + @JsonProperty("config") config: Map[String, String] + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala new file mode 100644 index 0000000..9c60755 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.user + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class DataCacheParam( @JsonProperty("type") cacheType: String, + @JsonProperty("config") config: Map[String, Any], + @JsonProperty("time.range") timeRange: List[String] + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala new file mode 100644 index 0000000..dbc2e0b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala @@ -0,0 +1,37 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.user + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class DataConnectorParam( @JsonProperty("type") conType: String, + @JsonProperty("version") version: String, + @JsonProperty("config") config: Map[String, Any], + @JsonProperty("cache") cache: DataCacheParam, + @JsonProperty("match.once") matchOnce: Boolean + ) extends Param { + + def getMatchOnce(): Boolean = { + if (matchOnce == null) false else matchOnce + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala new file mode 100644 index 0000000..6ee9783 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala @@ -0,0 +1,30 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.user + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double, + @JsonProperty("rules") rules: String + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala new file mode 100644 index 0000000..df0647c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.params.user + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class UserParam(@JsonProperty("name") name: String, + @JsonProperty("type") dqType: String, + @JsonProperty("process.type") procType: String, + @JsonProperty("source") sourceParam: DataConnectorParam, + @JsonProperty("target") targetParam: DataConnectorParam, + @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala new file mode 100644 index 0000000..ede68f4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.reader + +import org.apache.griffin.measure.config.params.Param +import org.apache.griffin.measure.utils.JsonUtil + +import scala.util.Try + +case class ParamFileReader(file: String) extends ParamReader { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { + Try { + val source = scala.io.Source.fromFile(file) + val lines = source.mkString + val param = JsonUtil.fromJson[T](lines) + source.close + param + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala new file mode 100644 index 0000000..8b51b11 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.reader + +import org.apache.griffin.measure.config.params.Param +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.griffin.measure.utils.HdfsUtil + +import scala.util.Try + +case class ParamHdfsFileReader(filePath: String) extends ParamReader { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { + Try { + val source = HdfsUtil.openFile(filePath) + val param = JsonUtil.fromJson[T](source) + source.close + param + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala new file mode 100644 index 0000000..87e1953 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala @@ -0,0 +1,35 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.reader + +import org.apache.griffin.measure.config.params.Param +import org.apache.griffin.measure.utils.JsonUtil + +import scala.util.Try + +case class ParamRawStringReader(rawString: String) extends ParamReader { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { + Try { + val param = JsonUtil.fromJson[T](rawString) + param + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala new file mode 100644 index 0000000..3508223 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala @@ -0,0 +1,30 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.reader + +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.config.params.Param + +import scala.util.Try + +trait ParamReader extends Loggable with Serializable { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala new file mode 100644 index 0000000..9299247 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.reader + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + + +object ParamReaderFactory { + + val RawStringRegex = """^(?i)raw$""".r + val LocalFsRegex = """^(?i)local$""".r + val HdfsFsRegex = """^(?i)hdfs$""".r + + def getParamReader(filePath: String, fsType: String): ParamReader = { + fsType match { + case RawStringRegex() => ParamRawStringReader(filePath) + case LocalFsRegex() => ParamFileReader(filePath) + case HdfsFsRegex() => ParamHdfsFileReader(filePath) + case _ => ParamHdfsFileReader(filePath) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala new file mode 100644 index 0000000..66e140b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.validator + +import org.apache.griffin.measure.config.params.Param + +import scala.util.Try + +// need to validate params +case class AllParamValidator() extends ParamValidator { + + def validate[T <: Param](param: Param): Try[Boolean] = { + Try { + param.validate + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala new file mode 100644 index 0000000..1a3e050 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala @@ -0,0 +1,30 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.config.validator + +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.config.params.Param + +import scala.util.Try + +trait ParamValidator extends Loggable with Serializable { + + def validate[T <: Param](param: Param): Try[Boolean] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala new file mode 100644 index 0000000..1fb1868 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector + +import org.apache.griffin.measure.log.Loggable +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame + + +trait DataConnector extends Loggable with Serializable { + + def available(): Boolean + + def init(): Unit + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala new file mode 100644 index 0000000..670175d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala @@ -0,0 +1,139 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector + +import kafka.serializer.StringDecoder +import org.apache.griffin.measure.config.params.user._ +import org.apache.griffin.measure.connector.cache._ +import org.apache.griffin.measure.connector.direct._ +import org.apache.griffin.measure.connector.streaming._ +import org.apache.griffin.measure.rule.RuleExprs +import org.apache.spark.sql.SQLContext +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming.kafka.KafkaUtils + +import scala.reflect.ClassTag +import scala.util.Try + +object DataConnectorFactory { + + val HiveRegex = """^(?i)hive$""".r + val AvroRegex = """^(?i)avro$""".r + + val KafkaRegex = """^(?i)kafka$""".r + + val TextRegex = """^(?i)text$""".r + + def getDirectDataConnector(sqlContext: SQLContext, + ssc: StreamingContext, + dataConnectorParam: DataConnectorParam, + ruleExprs: RuleExprs, + globalFinalCacheMap: Map[String, Any] + ): Try[DirectDataConnector] = { + val conType = dataConnectorParam.conType + val version = dataConnectorParam.version + val config = dataConnectorParam.config + Try { + conType match { + case HiveRegex() => HiveDirectDataConnector(sqlContext, config, ruleExprs, globalFinalCacheMap) + case AvroRegex() => AvroDirectDataConnector(sqlContext, config, ruleExprs, globalFinalCacheMap) + case KafkaRegex() => { + val ksdcTry = getStreamingDataConnector(ssc, dataConnectorParam) + val cdcTry = getCacheDataConnector(sqlContext, dataConnectorParam.cache) + KafkaCacheDirectDataConnector(ksdcTry, cdcTry, dataConnectorParam, ruleExprs, globalFinalCacheMap) + } + case _ => throw new Exception("connector creation error!") + } + } + } + + private def getStreamingDataConnector(ssc: StreamingContext, + dataConnectorParam: DataConnectorParam + ): Try[StreamingDataConnector] = { + val conType = dataConnectorParam.conType + val version = dataConnectorParam.version + val config = dataConnectorParam.config + Try { + conType match { + case KafkaRegex() => { + genKafkaDataConnector(ssc, config) + } + case _ => throw new Exception("streaming connector creation error!") + } + } + } + + private def getCacheDataConnector(sqlContext: SQLContext, + dataCacheParam: DataCacheParam + ): Try[CacheDataConnector] = { + if (dataCacheParam == null) { + throw new Exception("invalid data cache param!") + } + val cacheType = dataCacheParam.cacheType + Try { + cacheType match { + case HiveRegex() => HiveCacheDataConnector(sqlContext, dataCacheParam) + case TextRegex() => TextCacheDataConnector(sqlContext, dataCacheParam) + case _ => throw new Exception("cache connector creation error!") + } + } + } + + private def genKafkaDataConnector(ssc: StreamingContext, config: Map[String, Any]) = { + val KeyType = "key.type" + val ValueType = "value.type" + val keyType = config.getOrElse(KeyType, "java.lang.String").toString + val valueType = config.getOrElse(ValueType, "java.lang.String").toString +// val KafkaConfig = "kafka.config" +// val Topics = "topics" +// val kafkaConfig = config.get(KafkaConfig) match { +// case Some(map: Map[String, Any]) => map.mapValues(_.toString).map(identity) +// case _ => Map[String, String]() +// } +// val topics = config.getOrElse(Topics, "").toString + (getClassTag(keyType), getClassTag(valueType)) match { + case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => { + if (ssc == null) throw new Exception("streaming context is null! ") + new KafkaStreamingDataConnector(ssc, config) { + type K = String + type KD = StringDecoder + type V = String + type VD = StringDecoder + def createDStream(topicSet: Set[String]): InputDStream[(K, V)] = { + KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet) + } + } + } + case _ => { + throw new Exception("not supported type kafka data connector") + } + } + } + + private def getClassTag(tp: String): ClassTag[_] = { + try { + val clazz = Class.forName(tp) + ClassTag(clazz) + } catch { + case e: Throwable => throw e + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala new file mode 100644 index 0000000..1dfe8e2 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala @@ -0,0 +1,33 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.cache + +import org.apache.griffin.measure.connector.DataConnector +import org.apache.spark.rdd.RDD + +import scala.util.Try + +trait CacheDataConnector extends DataConnector with DataCacheable with DataUpdatable { + + def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit + + def readData(): Try[RDD[Map[String, Any]]] + +} + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala new file mode 100644 index 0000000..2be87a6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala @@ -0,0 +1,86 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.cache + +import java.util.concurrent.atomic.AtomicLong + +import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} + +trait DataCacheable { + + protected val defCacheInfoPath = PathCounter.genPath + + val cacheInfoPath: String + val readyTimeInterval: Long + val readyTimeDelay: Long + + def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}" + + def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath) + def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath) + def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath) + def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath) + + protected def submitCacheTime(ms: Long): Unit = { + val map = Map[String, String]((selfCacheTime -> ms.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def submitReadyTime(ms: Long): Unit = { + val curReadyTime = ms - readyTimeDelay + if (curReadyTime % readyTimeInterval == 0) { + val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) + InfoCacheInstance.cacheInfo(map) + } + } + + protected def submitLastProcTime(ms: Long): Unit = { + val map = Map[String, String]((selfLastProcTime -> ms.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def submitCleanTime(ms: Long): Unit = { + val cleanTime = genCleanTime(ms) + val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def genCleanTime(ms: Long): Long = ms + + protected def readCleanTime(): Option[Long] = { + val key = selfCleanTime + val keys = key :: Nil + InfoCacheInstance.readInfo(keys).get(key).flatMap { v => + try { + Some(v.toLong) + } catch { + case _ => None + } + } + } + +} + +object PathCounter { + private val counter: AtomicLong = new AtomicLong(0L) + def genPath(): String = s"path_${increment}" + private def increment(): Long = { + counter.incrementAndGet() + } +} \ No newline at end of file