http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala new file mode 100644 index 0000000..07c8187 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala @@ -0,0 +1,30 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.cache + +import org.apache.spark.rdd.RDD + +trait DataUpdatable { + + def cleanOldData(): Unit = {} + + def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {} + def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {} + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala new file mode 100644 index 0000000..e241188 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala @@ -0,0 +1,351 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.cache + +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.config.params.user.DataCacheParam +import org.apache.griffin.measure.result.TimeStampInfo +import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveContext + +import scala.util.{Success, Try} + +case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam + ) extends CacheDataConnector { + + if (!sqlContext.isInstanceOf[HiveContext]) { + throw new Exception("hive context not prepared!") + } + + val config = dataCacheParam.config + val InfoPath = "info.path" + val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString + + val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") + val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") + + val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil + val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match { + case s :: e :: _ => { + val ns = TimeUtil.milliseconds(s) match { + case Some(n) if (n < 0) => n + case _ => 0 + } + val ne = TimeUtil.milliseconds(e) match { + case Some(n) if (n < 0) => n + case _ => 0 + } + (ns, ne) + } + case _ => (0, 0) + } + + val Database = "database" + val database: String = config.getOrElse(Database, "").toString + val TableName = "table.name" + val tableName: String = config.get(TableName) match { + case Some(s: String) if (s.nonEmpty) => s + case _ => throw new Exception("invalid table.name!") + } + val ParentPath = "parent.path" + val parentPath: String = config.get(ParentPath) match { + case Some(s: String) => s + case _ => throw new Exception("invalid parent.path!") + } + val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName) + + val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName + + val ReadyTimeInterval = "ready.time.interval" + val ReadyTimeDelay = "ready.time.delay" + val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L) + val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L) + + val TimeStampColumn: String = TimeStampInfo.key + val PayloadColumn: String = "payload" + +// type Schema = (Long, String) + val schema: List[(String, String)] = List( + (TimeStampColumn, "bigint"), + (PayloadColumn, "string") + ) + val schemaName = schema.map(_._1) + +// type Partition = (Long, Long) + val partition: List[(String, String, String)] = List( + ("hr", "bigint", "hour"), + ("min", "bigint", "min") + ) + val partitionName = partition.map(_._1) + + private val fieldSep = """|""" + private val rowSep = """\n""" + private val rowSepLiteral = "\n" + + private def dbPrefix(): Boolean = { + database.nonEmpty && !database.equals("default") + } + + private def tableExists(): Boolean = { + Try { + if (dbPrefix) { + sqlContext.tables(database).filter(tableExistsSql).collect.size + } else { + sqlContext.tables().filter(tableExistsSql).collect.size + } + } match { + case Success(s) => s > 0 + case _ => false + } + } + + override def init(): Unit = { + try { + if (tableExists) { + // drop exist table + val dropSql = s"""DROP TABLE ${concreteTableName}""" + sqlContext.sql(dropSql) + } + + val colsSql = schema.map { field => + s"`${field._1}` ${field._2}" + }.mkString(", ") + val partitionsSql = partition.map { partition => + s"`${partition._1}` ${partition._2}" + }.mkString(", ") + val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName} + |(${colsSql}) PARTITIONED BY (${partitionsSql}) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '${fieldSep}' + |LINES TERMINATED BY '${rowSep}' + |STORED AS TEXTFILE + |LOCATION '${tablePath}'""".stripMargin + sqlContext.sql(sql) + } catch { + case e: Throwable => throw e + } + } + + def available(): Boolean = { + true + } + + private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = { + try { + Some(schema.map { field => + val (name, _) = field + name match { + case TimeStampColumn => ms + case PayloadColumn => JsonUtil.toJson(data) + case _ => null + } + }) + } catch { + case _ => None + } + } + + private def decode(data: List[Any], updateTimeStamp: Boolean): Option[Map[String, Any]] = { + val dataMap = schemaName.zip(data).toMap + dataMap.get(PayloadColumn) match { + case Some(v: String) => { + try { + val map = JsonUtil.toAnyMap(v) + val resMap = if (updateTimeStamp) { + dataMap.get(TimeStampColumn) match { + case Some(t) => map + (TimeStampColumn -> t) + case _ => map + } + } else map + Some(resMap) + } catch { + case _ => None + } + } + case _ => None + } + } + + def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = { + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + val ptns = getPartition(ms) + val ptnsPath = genPartitionHdfsPath(ptns) + val dirPath = s"${tablePath}/${ptnsPath}" + val fileName = s"${ms}" + val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName) + + // encode data + val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms)) + + // save data + val recordRdd: RDD[String] = dataRdd.map { dt => + dt.map(_.toString).mkString(fieldSep) + } + + val dumped = if (!recordRdd.isEmpty) { + HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral) + } else false + + // add partition + if (dumped) { + val sql = addPartitionSql(concreteTableName, ptns) + sqlContext.sql(sql) + } + + // submit ms + submitCacheTime(ms) + submitReadyTime(ms) + } catch { + case e: Throwable => error(s"save data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + + def readData(): Try[RDD[Map[String, Any]]] = Try { + val timeRange = TimeInfoCache.getTimeRange + submitLastProcTime(timeRange._2) + + val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) + submitCleanTime(reviseTimeRange._1) + + // read directly through partition info + val partitionRange = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2) + val sql = selectSql(concreteTableName, partitionRange) + val df = sqlContext.sql(sql) + + // decode data + df.flatMap { row => + val dt = schemaName.map { sn => + row.getAs[Any](sn) + } + decode(dt, true) + } + } + + override def cleanOldData(): Unit = { + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + val cleanTime = readCleanTime() + cleanTime match { + case Some(ct) => { + // drop partition + val bound = getPartition(ct) + val sql = dropPartitionSql(concreteTableName, bound) + sqlContext.sql(sql) + } + case _ => { + // do nothing + } + } + } catch { + case e: Throwable => error(s"clean old data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + + override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { + // parallel process different time groups, lock is unnecessary + val ptns = getPartition(t) + val ptnsPath = genPartitionHdfsPath(ptns) + val dirPath = s"${tablePath}/${ptnsPath}" + val fileName = s"${t}" + val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName) + + try { + // remove out time old data + HdfsFileDumpUtil.remove(dirPath, fileName, true) + + // save updated old data + if (oldData.size > 0) { + val recordDatas = oldData.flatMap { dt => + encode(dt, t) + } + val records: Iterable[String] = recordDatas.map { dt => + dt.map(_.toString).mkString(fieldSep) + } + val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral) + } + } catch { + case e: Throwable => error(s"update old data error: ${e.getMessage}") + } + } + + override protected def genCleanTime(ms: Long): Long = { + val minPartition = partition.last + val t1 = TimeUtil.timeToUnit(ms, minPartition._3) + val t2 = TimeUtil.timeFromUnit(t1, minPartition._3) + t2 + } + + private def getPartition(ms: Long): List[(String, Any)] = { + partition.map { p => + val (name, _, unit) = p + val t = TimeUtil.timeToUnit(ms, unit) + (name, t) + } + } + private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, Any))] = { + partition.map { p => + val (name, _, unit) = p + val t1 = TimeUtil.timeToUnit(ms1, unit) + val t2 = TimeUtil.timeToUnit(ms2, unit) + (name, (t1, t2)) + } + } + + private def genPartitionHdfsPath(partition: List[(String, Any)]): String = { + partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/") + } + private def addPartitionSql(tbn: String, partition: List[(String, Any)]): String = { + val partitionSql = partition.map(ptn => (s"`${ptn._1}` = ${ptn._2}")).mkString(", ") + val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION (${partitionSql})""" + sql + } + private def selectSql(tbn: String, partitionRange: List[(String, (Any, Any))]): String = { + val clause = partitionRange.map { pr => + val (name, (r1, r2)) = pr + s"""`${name}` BETWEEN '${r1}' and '${r2}'""" + }.mkString(" AND ") + val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else "" + val sql = s"""SELECT * FROM ${tbn} ${whereClause}""" + sql + } + private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): String = { + val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < '${ptn._2}' ) ")).mkString(", ") + val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}""" + println(sql) + sql + } + + private def tableExistsSql(): String = { + s"tableName LIKE '${tableName}'" + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala new file mode 100644 index 0000000..62b6086 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala @@ -0,0 +1,311 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.cache + +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.config.params.user.DataCacheParam +import org.apache.griffin.measure.result.TimeStampInfo +import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext + +import scala.util.Try + +case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam + ) extends CacheDataConnector { + + val config = dataCacheParam.config + val InfoPath = "info.path" + val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString + + val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") + val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") + + val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil + val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match { + case s :: e :: _ => { + val ns = TimeUtil.milliseconds(s) match { + case Some(n) if (n < 0) => n + case _ => 0 + } + val ne = TimeUtil.milliseconds(e) match { + case Some(n) if (n < 0) => n + case _ => 0 + } + (ns, ne) + } + case _ => (0, 0) + } + + val FilePath = "file.path" + val filePath: String = config.get(FilePath) match { + case Some(s: String) => s + case _ => throw new Exception("invalid file.path!") + } + + val ReadyTimeInterval = "ready.time.interval" + val ReadyTimeDelay = "ready.time.delay" + val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L) + val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L) + +// val TimeStampColumn: String = TimeStampInfo.key +// val PayloadColumn: String = "payload" + + // cache schema: Long, String +// val fields = List[StructField]( +// StructField(TimeStampColumn, LongType), +// StructField(PayloadColumn, StringType) +// ) +// val schema = StructType(fields) + + // case class CacheData(time: Long, payload: String) { + // def getTime(): Long = time + // def getPayload(): String = payload + // } + + private val rowSepLiteral = "\n" + + val partitionUnits: List[String] = List("hour", "min") + + override def init(): Unit = { + // do nothing + } + + def available(): Boolean = { + true + } + + private def encode(data: Map[String, Any], ms: Long): Option[String] = { + try { + val map = data + (TimeStampInfo.key -> ms) + Some(JsonUtil.toJson(map)) + } catch { + case _: Throwable => None + } + } + + private def decode(data: String): Option[Map[String, Any]] = { + try { + Some(JsonUtil.toAnyMap(data)) + } catch { + case _: Throwable => None + } + } + + def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = { + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + val ptns = getPartition(ms) + val ptnsPath = genPartitionHdfsPath(ptns) + val dirPath = s"${filePath}/${ptnsPath}" + val dataFileName = s"${ms}" + val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) + + // encode data + val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms)) + + // save data + val dumped = if (!dataRdd.isEmpty) { + HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral) + } else false + + // submit ms + submitCacheTime(ms) + submitReadyTime(ms) + } catch { + case e: Throwable => error(s"save data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + + def readData(): Try[RDD[Map[String, Any]]] = Try { + val timeRange = TimeInfoCache.getTimeRange + submitLastProcTime(timeRange._2) + + val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) + submitCleanTime(reviseTimeRange._1) + + // read directly through partition info + val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2) + println(s"read time ranges: ${reviseTimeRange}") + println(s"read partition ranges: ${partitionRanges}") + + // list partition paths + val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges) + + if (partitionPaths.isEmpty) { + sqlContext.sparkContext.emptyRDD[Map[String, Any]] + } else { + val filePaths = partitionPaths.mkString(",") + val rdd = sqlContext.sparkContext.textFile(filePaths) + + // decode data + rdd.flatMap { row => + decode(row) + } + } + } + + override def cleanOldData(): Unit = { + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + val cleanTime = readCleanTime() + cleanTime match { + case Some(ct) => { + // drop partitions + val bounds = getPartition(ct) + + // list partition paths + val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds) + + // delete out time data path + earlierPaths.foreach { path => + println(s"delete hdfs path: ${path}") + HdfsUtil.deleteHdfsPath(path) + } + } + case _ => { + // do nothing + } + } + } catch { + case e: Throwable => error(s"clean old data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + + override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { + // parallel process different time groups, lock is unnecessary + val ptns = getPartition(t) + val ptnsPath = genPartitionHdfsPath(ptns) + val dirPath = s"${filePath}/${ptnsPath}" + val dataFileName = s"${t}" + val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) + + try { + // remove out time old data + HdfsFileDumpUtil.remove(dirPath, dataFileName, true) + + // save updated old data + if (oldData.size > 0) { + val recordDatas = oldData.flatMap { dt => + encode(dt, t) + } + val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, rowSepLiteral) + } + } catch { + case e: Throwable => error(s"update old data error: ${e.getMessage}") + } + } + + override protected def genCleanTime(ms: Long): Long = { + val minPartitionUnit = partitionUnits.last + val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit) + val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit) + t2 + } + + private def getPartition(ms: Long): List[Long] = { + partitionUnits.map { unit => + TimeUtil.timeToUnit(ms, unit) + } + } + private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = { + partitionUnits.map { unit => + val t1 = TimeUtil.timeToUnit(ms1, unit) + val t2 = TimeUtil.timeToUnit(ms2, unit) + (t1, t2) + } + } + + private def genPartitionHdfsPath(partition: List[Long]): String = { + partition.map(prtn => s"${prtn}").mkString("/") + } + + private def str2Long(str: String): Option[Long] = { + try { + Some(str.toLong) + } catch { + case e: Throwable => None + } + } + + // here the range means [min, max], but the best range should be (min, max] + private def listPathsBetweenRanges(paths: List[String], + partitionRanges: List[(Long, Long)] + ): List[String] = { + partitionRanges match { + case Nil => paths + case head :: tail => { + val (lb, ub) = head + val curPaths = paths.flatMap { path => + val names = HdfsUtil.listSubPaths(path, "dir").toList + names.filter { name => + str2Long(name) match { + case Some(t) => (t >= lb) && (t <= ub) + case _ => false + } + }.map(HdfsUtil.getHdfsFilePath(path, _)) + } + listPathsBetweenRanges(curPaths, tail) + } + } + } + + private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long] + ): List[String] = { + bounds match { + case Nil => paths + case head :: tail => { + val earlierPaths = paths.flatMap { path => + val names = HdfsUtil.listSubPaths(path, "dir").toList + names.filter { name => + str2Long(name) match { + case Some(t) => (t < head) + case _ => false + } + }.map(HdfsUtil.getHdfsFilePath(path, _)) + } + val equalPaths = paths.flatMap { path => + val names = HdfsUtil.listSubPaths(path, "dir").toList + names.filter { name => + str2Long(name) match { + case Some(t) => (t == head) + case _ => false + } + }.map(HdfsUtil.getHdfsFilePath(path, _)) + } + + tail match { + case Nil => earlierPaths + case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail) + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala new file mode 100644 index 0000000..b45e5a9 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala @@ -0,0 +1,132 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.direct + +import org.apache.griffin.measure.result._ +import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs} +import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext + +import scala.util.Try + +// data connector for avro file +case class AvroDirectDataConnector(sqlContext: SQLContext, config: Map[String, Any], + ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any] + ) extends DirectDataConnector { + + val FilePath = "file.path" + val FileName = "file.name" + + val filePath = config.getOrElse(FilePath, "").toString + val fileName = config.getOrElse(FileName, "").toString + + val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName + + private def pathPrefix(): Boolean = { + filePath.nonEmpty + } + + private def fileExist(): Boolean = { + HdfsUtil.existPath(concreteFileFullPath) + } + + def available(): Boolean = { + (!concreteFileFullPath.isEmpty) && fileExist + } + + def init(): Unit = {} + + def metaData(): Try[Iterable[(String, String)]] = { + Try { + val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema + st.fields.map(f => (f.name, f.dataType.typeName)) + } + } + + def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = { + Try { + loadDataFile.flatMap { row => + // generate cache data + val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap) + val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) + + // data info + val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => + try { + (info.key -> row.getAs[info.T](info.key)) + } catch { + case e: Throwable => info.defWrap + } + }.toMap + + finalExprValueMaps.flatMap { finalExprValueMap => + val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => + expr.calculate(finalExprValueMap) match { + case Some(v) => Some(v.asInstanceOf[AnyRef]) + case _ => None + } + } + val key = toTuple(groupbyData) + + Some((key, (finalExprValueMap, dataInfoMap))) + } + +// val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) => +// ExprValueUtil.genExprValueMaps(Some(row), expr, cachedMap) +// } +// val finalExprValueMap = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMap) + + // when clause filter data source +// val whenResult = ruleExprs.whenClauseExprOpt match { +// case Some(whenClause) => whenClause.calculate(finalExprValueMap) +// case _ => None +// } +// +// // get groupby data +// whenResult match { +// case Some(false) => None +// case _ => { +// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => +// expr.calculate(finalExprValueMap) match { +// case Some(v) => Some(v.asInstanceOf[AnyRef]) +// case _ => None +// } +// } +// val key = toTuple(groupbyData) +// +// Some((key, finalExprValueMap)) +// } +// } + } + } + } + + private def loadDataFile() = { + sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath) + } + + private def toTuple[A <: AnyRef](as: Seq[A]): Product = { + if (as.size > 0) { + val tupleClass = Class.forName("scala.Tuple" + as.size) + tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala new file mode 100644 index 0000000..ac1a792 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.direct + +import org.apache.griffin.measure.connector.DataConnector +import org.apache.griffin.measure.connector.cache.DataUpdatable +import org.apache.spark.rdd.RDD + +import scala.util.Try + + +trait DirectDataConnector extends DataConnector with DataUpdatable { + + def metaData(): Try[Iterable[(String, String)]] + + def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala new file mode 100644 index 0000000..7de2b02 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala @@ -0,0 +1,158 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.direct + +import org.apache.griffin.measure.result._ +import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext + +import scala.util.{Success, Try} + +// data connector for hive +case class HiveDirectDataConnector(sqlContext: SQLContext, config: Map[String, Any], + ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any] + ) extends DirectDataConnector { + + val Database = "database" + val TableName = "table.name" + val Partitions = "partitions" + + val database = config.getOrElse(Database, "").toString + val tableName = config.getOrElse(TableName, "").toString + val partitionsString = config.getOrElse(Partitions, "").toString + + val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName + val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim)) + + private def dbPrefix(): Boolean = { + database.nonEmpty && !database.equals("default") + } + + def available(): Boolean = { + (!tableName.isEmpty) && { + Try { + if (dbPrefix) { + sqlContext.tables(database).filter(tableExistsSql).collect.size + } else { + sqlContext.tables().filter(tableExistsSql).collect.size + } + } match { + case Success(s) => s > 0 + case _ => false + } + } + } + + def init(): Unit = {} + + def metaData(): Try[Iterable[(String, String)]] = { + Try { + val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect + val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# ")) + if (partitionPos < 0) originRows + else originRows.take(partitionPos) + } + } + + def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = { + Try { + sqlContext.sql(dataSql).flatMap { row => + // generate cache data + val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap) + val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) + + // data info + val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => + try { + (info.key -> row.getAs[info.T](info.key)) + } catch { + case e: Throwable => info.defWrap + } + }.toMap + + finalExprValueMaps.flatMap { finalExprValueMap => + val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => + expr.calculate(finalExprValueMap) match { + case Some(v) => Some(v.asInstanceOf[AnyRef]) + case _ => None + } + } + val key = toTuple(groupbyData) + + Some((key, (finalExprValueMap, dataInfoMap))) + } + + // generate cache data +// val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) => +// ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap) +// } +// val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap) +// +// // when clause filter data source +// val whenResult = ruleExprs.whenClauseExprOpt match { +// case Some(whenClause) => whenClause.calculate(finalExprValueMap) +// case _ => None +// } +// +// // get groupby data +// whenResult match { +// case Some(false) => None +// case _ => { +// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => +// expr.calculate(finalExprValueMap) match { +// case Some(v) => Some(v.asInstanceOf[AnyRef]) +// case _ => None +// } +// } +// val key = toTuple(groupbyData) +// +// Some((key, finalExprValueMap)) +// } +// } + } + } + } + + private def tableExistsSql(): String = { +// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql + s"tableName LIKE '${tableName}'" + } + + private def metaDataSql(): String = { + s"DESCRIBE ${concreteTableName}" + } + + private def dataSql(): String = { + val clauses = partitions.map { prtn => + val cls = prtn.mkString(" AND ") + if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}" + else s"SELECT * FROM ${concreteTableName} WHERE ${cls}" + } + clauses.mkString(" UNION ALL ") + } + + private def toTuple[A <: AnyRef](as: Seq[A]): Product = { + if (as.size > 0) { + val tupleClass = Class.forName("scala.Tuple" + as.size) + tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala new file mode 100644 index 0000000..d2534cc --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala @@ -0,0 +1,125 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.direct + +import org.apache.griffin.measure.config.params.user.DataConnectorParam +import org.apache.griffin.measure.connector.DataConnectorFactory +import org.apache.griffin.measure.connector.cache.CacheDataConnector +import org.apache.griffin.measure.connector.streaming.StreamingDataConnector +import org.apache.griffin.measure.result._ +import org.apache.griffin.measure.rule._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext +import org.apache.spark.streaming.StreamingContext + +import scala.util.{Failure, Success, Try} + +case class KafkaCacheDirectDataConnector(@transient streamingDataConnectorTry: Try[StreamingDataConnector], + cacheDataConnectorTry: Try[CacheDataConnector], + dataConnectorParam: DataConnectorParam, + ruleExprs: RuleExprs, + constFinalExprValueMap: Map[String, Any] + ) extends StreamingCacheDirectDataConnector { + + val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match { + case Success(cntr) => cntr + case Failure(ex) => throw ex + } + @transient val streamingDataConnector: StreamingDataConnector = streamingDataConnectorTry match { + case Success(cntr) => cntr + case Failure(ex) => throw ex + } + + protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)], + ms: Long + ): RDD[Map[String, Any]] = { + val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + TimeStampInfo.wrap(ms) + + rdd.flatMap { kv => + val msg = kv._2 + + val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, constFinalExprValueMap) + val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) + + finalExprValueMaps.map { vm => + vm ++ dataInfoMap + } + } + } + + def metaData(): Try[Iterable[(String, String)]] = Try { + Map.empty[String, String] + } + + def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = Try { + cacheDataConnector.readData match { + case Success(rdd) => { + rdd.flatMap { row => + val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr => + row.get(expr._id).flatMap { d => + Some((expr._id, d)) + } + }.toMap + + val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info => + row.get(info.key) match { + case Some(d) => (info.key -> d) + case _ => info.defWrap + } + }.toMap + + val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr => + expr.calculate(finalExprValueMap) match { + case Some(v) => Some(v.asInstanceOf[AnyRef]) + case _ => None + } + } + val key = toTuple(groupbyData) + + Some((key, (finalExprValueMap, dataInfoMap))) + } + } + case Failure(ex) => throw ex + } + } + + override def cleanOldData(): Unit = { + cacheDataConnector.cleanOldData + } + + override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = { + if (dataConnectorParam.getMatchOnce) { + cacheDataConnector.updateOldData(t, oldData) + } + } + + override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = { + if (dataConnectorParam.getMatchOnce) { + cacheDataConnector.updateAllOldData(oldRdd) + } + } + + private def toTuple[A <: AnyRef](as: Seq[A]): Product = { + if (as.size > 0) { + val tupleClass = Class.forName("scala.Tuple" + as.size) + tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala new file mode 100644 index 0000000..87139d6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala @@ -0,0 +1,60 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.direct + +import org.apache.griffin.measure.connector.cache.CacheDataConnector +import org.apache.griffin.measure.connector.streaming.StreamingDataConnector +import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo} +import org.apache.griffin.measure.rule.ExprValueUtil +import org.apache.spark.rdd.RDD + +import scala.util.{Failure, Success} + +trait StreamingCacheDirectDataConnector extends DirectDataConnector { + + val cacheDataConnector: CacheDataConnector + @transient val streamingDataConnector: StreamingDataConnector + + def available(): Boolean = { + cacheDataConnector.available && streamingDataConnector.available + } + + def init(): Unit = { + cacheDataConnector.init + + val ds = streamingDataConnector.stream match { + case Success(dstream) => dstream + case Failure(ex) => throw ex + } + + ds.foreachRDD((rdd, time) => { + val ms = time.milliseconds + + val valueMapRdd = transform(rdd, ms) + + // save data frame + cacheDataConnector.saveData(valueMapRdd, ms) + }) + } + + protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)], + ms: Long + ): RDD[Map[String, Any]] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala new file mode 100644 index 0000000..fdd511d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala @@ -0,0 +1,58 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.streaming + +import kafka.serializer.Decoder +import org.apache.griffin.measure.connector.cache.{CacheDataConnector, DataCacheable} +import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo} +import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.InputDStream + +import scala.util.{Failure, Success, Try} + +abstract class KafkaStreamingDataConnector(@transient ssc: StreamingContext, + config: Map[String, Any] + ) extends StreamingDataConnector { + type KD <: Decoder[K] + type VD <: Decoder[V] + + val KafkaConfig = "kafka.config" + val Topics = "topics" + + val kafkaConfig = config.get(KafkaConfig) match { + case Some(map: Map[String, Any]) => map.mapValues(_.toString).map(identity) + case _ => Map[String, String]() + } + val topics = config.getOrElse(Topics, "").toString + + def available(): Boolean = { + true + } + + def init(): Unit = {} + + def stream(): Try[InputDStream[(K, V)]] = Try { + val topicSet = topics.split(",").toSet + createDStream(topicSet) + } + + protected def createDStream(topicSet: Set[String]): InputDStream[(K, V)] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala new file mode 100644 index 0000000..c37caac --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.connector.streaming + +import org.apache.griffin.measure.connector.DataConnector +import org.apache.spark.streaming.dstream.InputDStream + +import scala.util.Try + + +trait StreamingDataConnector extends DataConnector { + + type K + type V + + def stream(): Try[InputDStream[(K, V)]] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala b/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala new file mode 100644 index 0000000..265a8cd --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala @@ -0,0 +1,43 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.log + +import org.slf4j.LoggerFactory + +trait Loggable { + + @transient private lazy val logger = LoggerFactory.getLogger(getClass) + + protected def info(msg: String): Unit = { + logger.info(msg) + } + + protected def debug(msg: String): Unit = { + logger.debug(msg) + } + + protected def warn(msg: String): Unit = { + logger.warn(msg) + } + + protected def error(msg: String): Unit = { + logger.error(msg) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala new file mode 100644 index 0000000..97786c4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala @@ -0,0 +1,206 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.persist + +import java.util.Date + +import org.apache.griffin.measure.result._ +import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.spark.rdd.RDD + +import scala.util.Try + +// persist result and data to hdfs +case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Path = "path" + val MaxPersistLines = "max.persist.lines" + val MaxLinesPerFile = "max.lines.per.file" + + val path = config.getOrElse(Path, "").toString + val maxPersistLines = try { config.getOrElse(MaxPersistLines, -1).toString.toInt } catch { case _ => -1 } + val maxLinesPerFile = try { config.getOrElse(MaxLinesPerFile, 10000).toString.toLong } catch { case _ => 10000 } + + val separator = "/" + + val StartFile = filePath("_START") + val FinishFile = filePath("_FINISH") + val ResultFile = filePath("_RESULT") + + val MissRecFile = filePath("_MISSREC") // optional + val MatchRecFile = filePath("_MATCHREC") // optional + + val LogFile = filePath("_LOG") + + var _init = true + private def isInit = { + val i = _init + _init = false + i + } + + def available(): Boolean = { + (path.nonEmpty) && (maxPersistLines < Int.MaxValue) + } + + private def persistHead: String = { + val dt = new Date(timeStamp) + s"================ log of ${dt} ================\n" + } + + private def timeHead(rt: Long): String = { + val dt = new Date(rt) + s"--- ${dt} ---\n" + } + + protected def filePath(file: String): String = { + HdfsUtil.getHdfsFilePath(path, s"${metricName}/${timeStamp}/${file}") + } + + protected def withSuffix(path: String, suffix: String): String = { + s"${path}.${suffix}" + } + + def start(msg: String): Unit = { + try { + HdfsUtil.writeContent(StartFile, msg) + } catch { + case e: Throwable => error(e.getMessage) + } + } + def finish(): Unit = { + try { + HdfsUtil.createEmptyFile(FinishFile) + } catch { + case e: Throwable => error(e.getMessage) + } + } + + def result(rt: Long, result: Result): Unit = { + try { + val resStr = result match { + case ar: AccuracyResult => { + s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" + } + case pr: ProfileResult => { + s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}" + } + case _ => { + s"result: ${result}" + } + } + HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr) + log(rt, resStr) + + info(resStr) + } catch { + case e: Throwable => error(e.getMessage) + } + } + + // need to avoid string too long + private def rddRecords(records: RDD[String], path: String): Unit = { + try { + val recordCount = records.count + val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + if (count > 0) { + val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt + if (groupCount <= 1) { + val recs = records.take(count.toInt) + persistRecords(path, recs) + } else { + val groupedRecords: RDD[(Long, Iterable[String])] = + records.zipWithIndex.flatMap { r => + val gid = r._2 / maxLinesPerFile + if (gid < groupCount) Some((gid, r._1)) else None + }.groupByKey() + groupedRecords.foreach { group => + val (gid, recs) = group + val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) + persistRecords(hdfsPath, recs) + } + } + } + } catch { + case e: Throwable => error(e.getMessage) + } + } + + private def iterableRecords(records: Iterable[String], path: String): Unit = { + try { + val recordCount = records.size + val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + if (count > 0) { + val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt + if (groupCount <= 1) { + val recs = records.take(count.toInt) + persistRecords(path, recs) + } else { + val groupedRecords = records.grouped(groupCount).zipWithIndex + groupedRecords.take(groupCount).foreach { group => + val (recs, gid) = group + val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) + persistRecords(hdfsPath, recs) + } + } + } + } catch { + case e: Throwable => error(e.getMessage) + } + } + + def records(recs: RDD[String], tp: String): Unit = { + tp match { + case PersistType.MISS => rddRecords(recs, MissRecFile) + case PersistType.MATCH => rddRecords(recs, MatchRecFile) + case _ => {} + } + } + + def records(recs: Iterable[String], tp: String): Unit = { + tp match { + case PersistType.MISS => iterableRecords(recs, MissRecFile) + case PersistType.MATCH => iterableRecords(recs, MatchRecFile) + case _ => {} + } + } + +// def missRecords(records: RDD[String]): Unit = { +// rddRecords(records, MissRecFile) +// } +// +// def matchRecords(records: RDD[String]): Unit = { +// rddRecords(records, MatchRecFile) +// } + + private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = { + val recStr = records.mkString("\n") + HdfsUtil.writeContent(hdfsPath, recStr) + } + + def log(rt: Long, msg: String): Unit = { + try { + val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n" + HdfsUtil.appendContent(LogFile, logStr) + } catch { + case e: Throwable => error(e.getMessage) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala new file mode 100644 index 0000000..6d5bac3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala @@ -0,0 +1,88 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.persist + +import org.apache.griffin.measure.result._ +import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} +import org.apache.spark.rdd.RDD + +import scala.util.Try + +// persist result by http way +case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Api = "api" + val Method = "method" + + val api = config.getOrElse(Api, "").toString + val method = config.getOrElse(Method, "post").toString + + def available(): Boolean = { + api.nonEmpty + } + + def start(msg: String): Unit = {} + def finish(): Unit = {} + + def result(rt: Long, result: Result): Unit = { + result match { + case ar: AccuracyResult => { + val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch)) + httpResult(dataMap) + } + case pr: ProfileResult => { + val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch)) + httpResult(dataMap) + } + case _ => { + info(s"result: ${result}") + } + } + } + + private def httpResult(dataMap: Map[String, Any]) = { + try { + val data = JsonUtil.toJson(dataMap) + // post + val params = Map[String, Object]() + val header = Map[String, Object]() + + def func(): Boolean = { + HttpUtil.httpRequest(api, method, params, header, data) + } + + PersistThreadPool.addTask(func _, 10) + +// val status = HttpUtil.httpRequest(api, method, params, header, data) +// info(s"${method} to ${api} response status: ${status}") + } catch { + case e: Throwable => error(e.getMessage) + } + + } + + def records(recs: RDD[String], tp: String): Unit = {} + def records(recs: Iterable[String], tp: String): Unit = {} + +// def missRecords(records: RDD[String]): Unit = {} +// def matchRecords(records: RDD[String]): Unit = {} + + def log(rt: Long, msg: String): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala new file mode 100644 index 0000000..00d41ea --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala @@ -0,0 +1,118 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.persist + +import java.util.Date + +import org.apache.griffin.measure.result._ +import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.spark.rdd.RDD + +// persist result and data to hdfs +case class LoggerPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val MaxLogLines = "max.log.lines" + + val maxLogLines = try { config.getOrElse(MaxLogLines, 100).toString.toInt } catch { case _ => 100 } + + def available(): Boolean = true + + def start(msg: String): Unit = { + println(s"[${timeStamp}] ${metricName} start") + } + def finish(): Unit = { + println(s"[${timeStamp}] ${metricName} finish") + } + + def result(rt: Long, result: Result): Unit = { + try { + val resStr = result match { + case ar: AccuracyResult => { + s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" + } + case pr: ProfileResult => { + s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}" + } + case _ => { + s"result: ${result}" + } + } + println(s"[${timeStamp}] ${metricName} result: \n${resStr}") + } catch { + case e: Throwable => error(e.getMessage) + } + } + + // need to avoid string too long + private def rddRecords(records: RDD[String]): Unit = { + try { + val recordCount = records.count.toInt + val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) + if (count > 0) { + val recordsArray = records.take(count) +// recordsArray.foreach(println) + } + } catch { + case e: Throwable => error(e.getMessage) + } + } + + private def iterableRecords(records: Iterable[String]): Unit = { + try { + val recordCount = records.size + val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) + if (count > 0) { + val recordsArray = records.take(count) +// recordsArray.foreach(println) + } + } catch { + case e: Throwable => error(e.getMessage) + } + } + + def records(recs: RDD[String], tp: String): Unit = { + tp match { + case PersistType.MISS => rddRecords(recs) + case PersistType.MATCH => rddRecords(recs) + case _ => {} + } + } + + def records(recs: Iterable[String], tp: String): Unit = { + tp match { + case PersistType.MISS => iterableRecords(recs) + case PersistType.MATCH => iterableRecords(recs) + case _ => {} + } + } + +// def missRecords(records: RDD[String]): Unit = { +// warn(s"[${timeStamp}] ${metricName} miss records: ") +// rddRecords(records) +// } +// def matchRecords(records: RDD[String]): Unit = { +// warn(s"[${timeStamp}] ${metricName} match records: ") +// rddRecords(records) +// } + + def log(rt: Long, msg: String): Unit = { + println(s"[${timeStamp}] ${rt}: ${msg}") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala new file mode 100644 index 0000000..25c8b0b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala @@ -0,0 +1,52 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.persist + +import org.apache.griffin.measure.result._ +import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} +import org.apache.spark.rdd.RDD + +import scala.util.Try + +// persist result and data by multiple persists +case class MultiPersists(persists: Iterable[Persist]) extends Persist { + + val timeStamp: Long = persists match { + case Nil => 0 + case _ => persists.head.timeStamp + } + + val config: Map[String, Any] = Map[String, Any]() + + def available(): Boolean = { persists.exists(_.available()) } + + def start(msg: String): Unit = { persists.foreach(_.start(msg)) } + def finish(): Unit = { persists.foreach(_.finish()) } + + def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) } + + def records(recs: RDD[String], tp: String): Unit = { persists.foreach(_.records(recs, tp)) } + def records(recs: Iterable[String], tp: String): Unit = { persists.foreach(_.records(recs, tp)) } + +// def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) } +// def matchRecords(records: RDD[String]): Unit = { persists.foreach(_.matchRecords(records)) } + + def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala new file mode 100644 index 0000000..357d6e1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala @@ -0,0 +1,87 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.persist + +import org.apache.griffin.measure.result._ +import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} +import org.apache.spark.rdd.RDD + +// persist result by old http way -- temporary way +case class OldHttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Api = "api" + val Method = "method" + + val api = config.getOrElse(Api, "").toString + val method = config.getOrElse(Method, "post").toString + + def available(): Boolean = { + api.nonEmpty + } + + def start(msg: String): Unit = {} + def finish(): Unit = {} + + def result(rt: Long, result: Result): Unit = { + result match { + case ar: AccuracyResult => { + val matchPercentage: Double = if (ar.getTotal <= 0) 0 else (ar.getMatch * 1.0 / ar.getTotal) * 100 + val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> ar.getTotal)) + httpResult(dataMap) + } + case pr: ProfileResult => { + val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal)) + httpResult(dataMap) + } + case _ => { + info(s"result: ${result}") + } + } + } + + private def httpResult(dataMap: Map[String, Any]) = { + try { + val data = JsonUtil.toJson(dataMap) + // post + val params = Map[String, Object]() + val header = Map[String, Object](("content-type" -> "application/json")) + + def func(): Boolean = { + HttpUtil.httpRequest(api, method, params, header, data) + } + + PersistThreadPool.addTask(func _, 10) + +// val status = HttpUtil.httpRequest(api, method, params, header, data) +// info(s"${method} to ${api} response status: ${status}") + } catch { + case e: Throwable => error(e.getMessage) + } + + } + + def records(recs: RDD[String], tp: String): Unit = {} + def records(recs: Iterable[String], tp: String): Unit = {} + +// def missRecords(records: RDD[String]): Unit = {} +// def matchRecords(records: RDD[String]): Unit = {} + + def log(rt: Long, msg: String): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala new file mode 100644 index 0000000..bc16599 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala @@ -0,0 +1,52 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.persist + +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.result._ +import org.apache.spark.rdd.RDD + +import scala.util.Try + + +trait Persist extends Loggable with Serializable { + val timeStamp: Long + + val config: Map[String, Any] + + def available(): Boolean + + def start(msg: String): Unit + def finish(): Unit + + def result(rt: Long, result: Result): Unit + + def records(recs: RDD[String], tp: String): Unit + def records(recs: Iterable[String], tp: String): Unit + +// def missRecords(records: RDD[String]): Unit +// def matchRecords(records: RDD[String]): Unit + + def log(rt: Long, msg: String): Unit +} + +object PersistType { + final val MISS = "miss" + final val MATCH = "match" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala new file mode 100644 index 0000000..4330160 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.persist + +import org.apache.griffin.measure.config.params.env._ + +import scala.util.{Success, Try} + + +case class PersistFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable { + + val HDFS_REGEX = """^(?i)hdfs$""".r + val HTTP_REGEX = """^(?i)http$""".r + val OLDHTTP_REGEX = """^(?i)oldhttp$""".r + val LOG_REGEX = """^(?i)log$""".r + + def getPersists(timeStamp: Long): MultiPersists = { + MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param))) + } + + // get the persists configured + private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = { + val config = persistParam.config + val persistTry = persistParam.persistType match { + case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp)) + case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp)) + case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, timeStamp)) + case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp)) + case _ => throw new Exception("not supported persist type") + } + persistTry match { + case Success(persist) if (persist.available) => Some(persist) + case _ => None + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala new file mode 100644 index 0000000..7993aab --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala @@ -0,0 +1,62 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.persist + +import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} + +object PersistThreadPool { + + private val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(10).asInstanceOf[ThreadPoolExecutor] + val MAX_RETRY = 100 + + def shutdown(): Unit = { + pool.shutdown() + pool.awaitTermination(10, TimeUnit.SECONDS) + } + + def addTask(func: () => Boolean, retry: Int): Unit = { + val r = if (retry < 0) MAX_RETRY else retry + println(s"add task, current task num: ${pool.getQueue.size}") + pool.submit(Task(func, r)) + } + + case class Task(func: () => Boolean, retry: Int) extends Runnable { + + override def run(): Unit = { + try { + var i = retry + var suc = false + while (!suc && i > 0) { + if (func()) { + println("task success") + suc = true + } else i = i - 1 + } + if (!suc) fail(s"retried for ${retry} times") + } catch { + case e: Throwable => fail(s"${e.getMessage}") + } + } + + def fail(msg: String): Unit = { + println(s"task fails: ${msg}") + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala b/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala new file mode 100644 index 0000000..16bb772 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala @@ -0,0 +1,44 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.result + +// result for accuracy: miss count, total count +case class AccuracyResult(miss: Long, total: Long) extends Result { + + type T = AccuracyResult + + def update(delta: T): T = { + AccuracyResult(delta.miss, total) + } + + def eventual(): Boolean = { + this.miss <= 0 + } + + def differsFrom(other: T): Boolean = { + (this.miss != other.miss) || (this.total != other.total) + } + + def getMiss = miss + def getTotal = total + def getMatch = total - miss + + def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100 + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/result/DataInfo.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/result/DataInfo.scala b/measure/src/main/scala/org/apache/griffin/measure/result/DataInfo.scala new file mode 100644 index 0000000..7ec0783 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/result/DataInfo.scala @@ -0,0 +1,50 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.result + + +sealed trait DataInfo { + type T + val key: String + def wrap(value: T) = (key -> value) + def defWrap() = wrap(dfv) + val dfv: T +} + +final case object TimeStampInfo extends DataInfo { + type T = Long + val key = "_tmst_" + val dfv = 0L +} + +final case object MismatchInfo extends DataInfo { + type T = String + val key = "_mismatch_" + val dfv = "" +} + +final case object ErrorInfo extends DataInfo { + type T = String + val key = "_error_" + val dfv = "" +} + +object DataInfo { + val cacheInfoList = List(TimeStampInfo, MismatchInfo, ErrorInfo) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala b/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala new file mode 100644 index 0000000..803416e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala @@ -0,0 +1,44 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.result + +// result for profile: match count, total count +case class ProfileResult(matchCount: Long, totalCount: Long) extends Result { + + type T = ProfileResult + + def update(delta: T): T = { + ProfileResult(matchCount + delta.matchCount, totalCount) + } + + def eventual(): Boolean = { + this.matchCount >= totalCount + } + + def differsFrom(other: T): Boolean = { + (this.matchCount != other.matchCount) || (this.totalCount != other.totalCount) + } + + def getMiss = totalCount - matchCount + def getTotal = totalCount + def getMatch = matchCount + + def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100 + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala b/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala new file mode 100644 index 0000000..6dcd9a1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.result + + +trait Result extends Serializable { + + type T <: Result + + def update(delta: T): T + + def eventual(): Boolean + + def differsFrom(other: T): Boolean + +}