http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LogicalExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LogicalExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LogicalExpr.scala new file mode 100644 index 0000000..dd061d7 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LogicalExpr.scala @@ -0,0 +1,178 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +import org.apache.griffin.measure.rule.CalculationUtil._ +import org.apache.spark.sql.types.{BooleanType, DataType} + +trait LogicalExpr extends Expr with AnalyzableExpr { + override def cacheUnit: Boolean = true +} + +case class LogicalSimpleExpr(expr: MathExpr) extends LogicalExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) + val desc: String = expr.desc + val dataSources: Set[String] = expr.dataSources + override def cacheUnit: Boolean = false + override def getSubCacheExprs(ds: String): Iterable[Expr] = expr.getCacheExprs(ds) + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = expr.getFinalCacheExprs(ds) + override def getSubPersistExprs(ds: String): Iterable[Expr] = expr.getPersistExprs(ds) +} + +case class LogicalCompareExpr(left: MathExpr, compare: String, right: MathExpr) extends LogicalExpr { + private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=") + def calculateOnly(values: Map[String, Any]): Option[Boolean] = { + val (lv, rv) = (left.calculate(values), right.calculate(values)) + compare match { + case this.eqOpr() => lv === rv + case this.neqOpr() => lv =!= rv + case this.btOpr => lv > rv + case this.bteOpr => lv >= rv + case this.ltOpr => lv < rv + case this.lteOpr => lv <= rv + case _ => None + } + } + val desc: String = s"${left.desc} ${compare} ${right.desc}" + val dataSources: Set[String] = left.dataSources ++ right.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + left.getCacheExprs(ds) ++ right.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + left.getFinalCacheExprs(ds) ++ right.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + left.getPersistExprs(ds) ++ right.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = { + if (compare == "=" || compare == "==") { + (left.dataSourceOpt, right.dataSourceOpt) match { + case (Some(dsPair._1), Some(dsPair._2)) => (left, right) :: Nil + case (Some(dsPair._2), Some(dsPair._1)) => (right, left) :: Nil + case _ => Nil + } + } else Nil + } +} + +case class LogicalRangeExpr(left: MathExpr, rangeOpr: String, range: RangeDesc) extends LogicalExpr { + private val (inOpr, ninOpr, btwnOpr, nbtwnOpr) = ("""(?i)in""".r, """(?i)not\s+in""".r, """(?i)between""".r, """(?i)not\s+between""".r) + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val (lv, rvs) = (left.calculate(values), range.elements.map(_.calculate(values))) + rangeOpr match { + case this.inOpr() => lv in rvs + case this.ninOpr() => lv not_in rvs + case this.btwnOpr() => lv between rvs + case this.nbtwnOpr() => lv not_between rvs + case _ => None + } + } + val desc: String = s"${left.desc} ${rangeOpr} ${range.desc}" + val dataSources: Set[String] = left.dataSources ++ range.elements.flatMap(_.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + left.getCacheExprs(ds) ++ range.elements.flatMap(_.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + left.getFinalCacheExprs(ds) ++ range.elements.flatMap(_.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + left.getPersistExprs(ds) ++ range.elements.flatMap(_.getPersistExprs(ds)) + } +} + +// -- logical statement -- +//case class LogicalFactorExpr(self: LogicalExpr) extends LogicalExpr { +// def calculate(values: Map[String, Any]): Option[Any] = self.calculate(values) +// val desc: String = self.desc +//} + +case class UnaryLogicalExpr(oprList: Iterable[String], factor: LogicalExpr) extends LogicalExpr { + private val notOpr = """(?i)not|!""".r + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = factor.calculate(values) + oprList.foldRight(fv) { (opr, v) => + opr match { + case this.notOpr() => !v + case _ => None + } + } + } + val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev} ${ex}" } + val dataSources: Set[String] = factor.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + factor.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + factor.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + factor.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = { + val notOprList = oprList.filter { opr => + opr match { + case this.notOpr() => true + case _ => false + } + } + if (notOprList.size % 2 == 0) factor.getGroupbyExprPairs(dsPair) else Nil + } +} + +case class BinaryLogicalExpr(first: LogicalExpr, others: Iterable[(String, LogicalExpr)]) extends LogicalExpr { + private val (andOpr, orOpr) = ("""(?i)and|&&""".r, """(?i)or|\|\|""".r) + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = first.calculate(values) + others.foldLeft(fv) { (v, pair) => + val (opr, next) = pair + val nv = next.calculate(values) + opr match { + case this.andOpr() => v && nv + case this.orOpr() => v || nv + case _ => None + } + } + } + val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" } + val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds)) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = { + if (others.isEmpty) first.getGroupbyExprPairs(dsPair) + else { + val isAnd = others.exists(_._1 match { + case this.andOpr() => true + case _ => false + }) + if (isAnd) { + first.getGroupbyExprPairs(dsPair) ++ others.flatMap(_._2.getGroupbyExprPairs(dsPair)) + } else Nil + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala new file mode 100644 index 0000000..661e8f4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala @@ -0,0 +1,99 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +import org.apache.griffin.measure.rule.CalculationUtil._ +import org.apache.griffin.measure.rule.DataTypeCalculationUtil._ +import org.apache.spark.sql.types.DataType + +trait MathExpr extends Expr { + +} + +case class MathFactorExpr(self: Expr) extends MathExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values) + val desc: String = self.desc + val dataSources: Set[String] = self.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + self.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + self.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + self.getPersistExprs(ds) + } +} + +case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr { + private val (posOpr, negOpr) = ("+", "-") + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = factor.calculate(values) + oprList.foldRight(fv) { (opr, v) => + opr match { + case this.posOpr => v + case this.negOpr => -v + case _ => None + } + } + } + val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" } + val dataSources: Set[String] = factor.dataSources + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + factor.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + factor.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + factor.getPersistExprs(ds) + } +} + +case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr { + private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%") + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = first.calculate(values) + others.foldLeft(fv) { (v, pair) => + val (opr, next) = pair + val nv = next.calculate(values) + opr match { + case this.addOpr => v + nv + case this.subOpr => v - nv + case this.mulOpr => v * nv + case this.divOpr => v / nv + case this.modOpr => v % nv + case _ => None + } + } + } + val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" } + val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala new file mode 100644 index 0000000..5b7f1b0 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala @@ -0,0 +1,88 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +import org.apache.spark.sql.types.DataType +import org.apache.griffin.measure.rule.CalculationUtil._ + +trait SelectExpr extends Expr { + def calculateOnly(values: Map[String, Any]): Option[Any] = None +} + +case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr { + val desc: String = s"[${fields.map(_.desc).mkString(", ")}]" + val dataSources: Set[String] = Set.empty[String] +} + +case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr { + val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})" + val dataSources: Set[String] = args.flatMap(_.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds)) + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds)) + override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds)) +} + +case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr { + val desc: String = s"[${field.desc} ${compare} ${value.desc}]" + val dataSources: Set[String] = value.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds) + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds) + override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds) + private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=") + override def calculateOnly(values: Map[String, Any]): Option[Any] = { + val (lv, rv) = (values.get(fieldKey), value.calculate(values)) + compare match { + case this.eqOpr() => lv === rv + case this.neqOpr() => lv =!= rv + case this.btOpr => lv > rv + case this.bteOpr => lv >= rv + case this.ltOpr => lv < rv + case this.lteOpr => lv <= rv + case _ => None + } + } + def fieldKey: String = s"__${field.field}" +} + +// -- selection -- +case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr { + def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id) + + val desc: String = { + val argsString = selectors.map(_.desc).mkString("") + s"${head.desc}${argsString}" + } + val dataSources: Set[String] = { + val selectorDataSources = selectors.flatMap(_.dataSources).toSet + selectorDataSources + head.head + } + + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getFinalCacheExprs(ds)) + } + + override def persistUnit: Boolean = true + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getPersistExprs(ds)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala new file mode 100644 index 0000000..15161c3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala @@ -0,0 +1,36 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.func + +import org.apache.griffin.measure.utils.JsonUtil + +class DefaultFunctionDefine extends FunctionDefine { + + def json(strOpt: Option[_]): Map[String, Any] = { + try { + strOpt match { + case Some(str: String) => JsonUtil.toAnyMap(str) + case _ => throw new Exception("json function param should be string") + } + } catch { + case e: Throwable => throw e + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala new file mode 100644 index 0000000..d23fc7a --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala @@ -0,0 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.func + +trait FunctionDefine extends Serializable { + +} + +class UnKnown {} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala new file mode 100644 index 0000000..57e934d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala @@ -0,0 +1,75 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.func + +import java.lang.reflect.Method + +import org.apache.griffin.measure.log.Loggable + +import scala.collection.mutable.{Map => MutableMap} + +object FunctionUtil extends Loggable { + + val functionDefines: MutableMap[String, FunctionDefine] = MutableMap[String, FunctionDefine]() + + registerFunctionDefine(Array(classOf[DefaultFunctionDefine].getCanonicalName)) + + def registerFunctionDefine(classes: Iterable[String]): Unit = { + for (cls <- classes) { + try { + val clz: Class[_] = Class.forName(cls) + if (classOf[FunctionDefine].isAssignableFrom(clz)) { + functionDefines += (cls -> clz.newInstance.asInstanceOf[FunctionDefine]) + } else { + warn(s"${cls} register fails: ${cls} is not sub class of ${classOf[FunctionDefine].getCanonicalName}") + } + } catch { + case e: Throwable => warn(s"${cls} register fails: ${e.getMessage}") + } + } + } + + def invoke(methodName: String, params: Array[Option[Any]]): Seq[Option[Any]] = { +// val paramTypes = params.map { param => +// try { +// param match { +// case Some(v) => v.getClass +// case _ => classOf[UnKnown] +// } +// } catch { +// case e: Throwable => classOf[UnKnown] +// } +// } + val paramTypes = params.map(a => classOf[Option[_]]) + + functionDefines.values.foldLeft(Nil: Seq[Option[Any]]) { (res, funcDef) => + if (res.isEmpty) { + val clz = funcDef.getClass + try { + val method = clz.getMethod(methodName, paramTypes: _*) + Seq(Some(method.invoke(funcDef, params: _*))) + } catch { + case e: Throwable => res + } + } else res + } + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala new file mode 100644 index 0000000..8a608ff --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala @@ -0,0 +1,83 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +import org.apache.spark.rdd.RDD + +object HdfsFileDumpUtil { + + val sepCount = 5000 + + private def suffix(i: Long): String = { + if (i == 0) "" else s".${i}" + } + private def samePattern(fileName: String, patternFileName: String): Boolean = { + fileName.startsWith(patternFileName) + } + + def splitRdd[T](rdd: RDD[T])(implicit m: Manifest[T]): RDD[(Long, Iterable[T])] = { + val indexRdd = rdd.zipWithIndex + indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey() + } + def splitIterable[T](datas: Iterable[T])(implicit m: Manifest[T]): Iterator[(Int, Iterable[T])] = { + val groupedData = datas.grouped(sepCount).zipWithIndex + groupedData.map(v => (v._2, v._1)) + } + + private def directDump(path: String, list: Iterable[String], lineSep: String): Unit = { + // collect and save + val strRecords = list.mkString(lineSep) + // save into hdfs + HdfsUtil.writeContent(path, strRecords) + } + + def dump(path: String, recordsRdd: RDD[String], lineSep: String): Boolean = { + val groupedRdd = splitRdd(recordsRdd) + groupedRdd.aggregate(true)({ (res, pair) => + val (idx, list) = pair + val filePath = path + suffix(idx) + directDump(filePath, list, lineSep) + true + }, _ && _) + } + def dump(path: String, records: Iterable[String], lineSep: String): Boolean = { + val groupedRecords = splitIterable(records) + groupedRecords.aggregate(true)({ (res, pair) => + val (idx, list) = pair + val filePath = path + suffix(idx) + directDump(filePath, list, lineSep) + true + }, _ && _) + } + + def remove(path: String, filename: String, withSuffix: Boolean): Unit = { + if (withSuffix) { + val files = HdfsUtil.listSubPaths(path, "file") + val patternFiles = files.filter(samePattern(_, filename)) + patternFiles.foreach { f => + val rmPath = HdfsUtil.getHdfsFilePath(path, f) + HdfsUtil.deleteHdfsPath(rmPath) + } + } else { + val rmPath = HdfsUtil.getHdfsFilePath(path, filename) + HdfsUtil.deleteHdfsPath(rmPath) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala new file mode 100644 index 0000000..6dd54b7 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala @@ -0,0 +1,120 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} + +object HdfsUtil { + + private val seprator = "/" + + private val conf = new Configuration() + conf.set("dfs.support.append", "true") +// conf.set("fs.defaultFS", "hdfs://localhost") // debug @localhost + + private val dfs = FileSystem.get(conf) + + def existPath(filePath: String): Boolean = { + val path = new Path(filePath) + dfs.exists(path) + } + + def createFile(filePath: String): FSDataOutputStream = { + val path = new Path(filePath) + if (dfs.exists(path)) dfs.delete(path, true) + return dfs.create(path) + } + + def appendOrCreateFile(filePath: String): FSDataOutputStream = { + val path = new Path(filePath) + if (dfs.exists(path)) dfs.append(path) else createFile(filePath) + } + + def openFile(filePath: String): FSDataInputStream = { + val path = new Path(filePath) + dfs.open(path) + } + + def writeContent(filePath: String, message: String): Unit = { + val out = createFile(filePath) + out.write(message.getBytes("utf-8")) + out.close + } + + def appendContent(filePath: String, message: String): Unit = { + val out = appendOrCreateFile(filePath) + out.write(message.getBytes("utf-8")) + out.close + } + + def createEmptyFile(filePath: String): Unit = { + val out = createFile(filePath) + out.close + } + + + def getHdfsFilePath(parentPath: String, fileName: String): String = { + if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName + } + + def deleteHdfsPath(dirPath: String): Unit = { + val path = new Path(dirPath) + if (dfs.exists(path)) dfs.delete(path, true) + } + +// def listPathFiles(dirPath: String): Iterable[String] = { +// val path = new Path(dirPath) +// try { +// val fileStatusArray = dfs.listStatus(path) +// fileStatusArray.flatMap { fileStatus => +// if (fileStatus.isFile) { +// Some(fileStatus.getPath.getName) +// } else None +// } +// } catch { +// case e: Throwable => { +// println(s"list path files error: ${e.getMessage}") +// Nil +// } +// } +// } + + def listSubPaths(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = { + val path = new Path(dirPath) + try { + val fileStatusArray = dfs.listStatus(path) + fileStatusArray.filter { fileStatus => + subType match { + case "dir" => fileStatus.isDirectory + case "file" => fileStatus.isFile + case _ => true + } + }.map { fileStatus => + val fname = fileStatus.getPath.getName + if (fullPath) getHdfsFilePath(dirPath, fname) else fname + } + } catch { + case e: Throwable => { + println(s"list path files error: ${e.getMessage}") + Nil + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala new file mode 100644 index 0000000..e016b60 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +import scalaj.http._ + +object HttpUtil { + + val GET_REGEX = """^(?i)get$""".r + val POST_REGEX = """^(?i)post$""".r + val PUT_REGEX = """^(?i)put$""".r + val DELETE_REGEX = """^(?i)delete$""".r + + def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): Boolean = { + val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString + response.isSuccess + } + + def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): Boolean = { + val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)) + method match { + case POST_REGEX() => { + val res = httpReq.postData(data).asString + res.isSuccess + } + case PUT_REGEX() => { + val res = httpReq.put(data).asString + res.isSuccess + } + case _ => false + } + } + + private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = { + map.map(pair => pair._1 -> pair._2.toString) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala new file mode 100644 index 0000000..1418375 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +import java.io.InputStream + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import scala.reflect._ + +object JsonUtil { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + + def toJson(value: Map[Symbol, Any]): String = { + toJson(value map { case (k,v) => k.name -> v}) + } + + def toJson(value: Any): String = { + mapper.writeValueAsString(value) + } + + def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json) + + def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = { + mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + } + + def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = { + mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + } + + def toAnyMap(json: String): Map[String, Any] = { + mapper.readValue(json, classOf[Map[String, Any]]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala new file mode 100644 index 0000000..0079d10 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala @@ -0,0 +1,79 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +import scala.util.{Failure, Success, Try} + +object TimeUtil { + + final val TimeRegex = """([+\-]?\d+)(d|h|m|s|ms)""".r + final val PureTimeRegex = """([+\-]?\d+)""".r + + def milliseconds(timeString: String): Option[Long] = { + val value: Option[Long] = { + Try { + timeString match { + case TimeRegex(time, unit) => { + val t = time.toLong + unit match { + case "d" => t * 24 * 60 * 60 * 1000 + case "h" => t * 60 * 60 * 1000 + case "m" => t * 60 * 1000 + case "s" => t * 1000 + case "ms" => t + case _ => throw new Exception(s"${timeString} is invalid time format") + } + } + case PureTimeRegex(time) => { + val t = time.toLong + t + } + case _ => throw new Exception(s"${timeString} is invalid time format") + } + } match { + case Success(v) => Some(v) + case Failure(ex) => throw ex + } + } + value + } + + def timeToUnit(ms: Long, unit: String): Long = { + unit match { + case "ms" => ms + case "sec" => ms / 1000 + case "min" => ms / (60 * 1000) + case "hour" => ms / (60 * 60 * 1000) + case "day" => ms / (24 * 60 * 60 * 1000) + case _ => ms / (60 * 1000) + } + } + + def timeFromUnit(t: Long, unit: String): Long = { + unit match { + case "ms" => t + case "sec" => t * 1000 + case "min" => t * 60 * 1000 + case "hour" => t * 60 * 60 * 1000 + case "day" => t * 24 * 60 * 60 * 1000 + case _ => t * 60 * 1000 + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-profile.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-profile.json b/measure/src/test/resources/config-profile.json index 6b82d7f..2529def 100644 --- a/measure/src/test/resources/config-profile.json +++ b/measure/src/test/resources/config-profile.json @@ -2,6 +2,8 @@ "name": "prof1", "type": "profile", + "process.type": "batch", + "source": { "type": "avro", "version": "1.7", http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-streaming.json b/measure/src/test/resources/config-streaming.json new file mode 100644 index 0000000..697fb7b --- /dev/null +++ b/measure/src/test/resources/config-streaming.json @@ -0,0 +1,69 @@ +{ + "name": "accu1", + "type": "accuracy", + + "process.type": "streaming", + + "source": { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "cache": { + "type": "hive", + "config": { + "database": "griffin_cache", + "table.name": "src", + "parent.path": "hdfs://localhost/griffin/streaming/dump", + "info.path": "src", + "ready.time.interval": "10s", + "ready.time.delay": "0" + }, + "time.range": ["-2m", "0"] + }, + "match.once": true + }, + + "target": { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "cache": { + "type": "hive", + "config": { + "database": "griffin_cache", + "table.name": "tgt", + "parent.path": "hdfs://localhost/griffin/streaming/dump", + "info.path": "tgt", + "ready.time.interval": "10s", + "ready.time.delay": "0" + }, + "time.range": ["-2m", "0"] + }, + "match.once": false + }, + + "evaluateRule": { + "sampleRatio": 0.2, + "rules": "$source.json().name = $target.json().name AND $source.json().age = $target.json().age" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-streaming1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-streaming1.json b/measure/src/test/resources/config-streaming1.json new file mode 100644 index 0000000..5465201 --- /dev/null +++ b/measure/src/test/resources/config-streaming1.json @@ -0,0 +1,65 @@ +{ + "name": "accu2", + "type": "accuracy", + + "process.type": "streaming", + + "source": { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "cache": { + "type": "df", + "config": { + "cache.level": "MEMORY_AND_DISK", + "info.path": "src", + "ready.time.interval": "10s", + "ready.time.delay": "0" + }, + "time.range": ["-2m", "0"] + }, + "match.once": true + }, + + "target": { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "cache": { + "type": "df", + "config": { + "cache.level": "MEMORY_AND_DISK", + "info.path": "tgt", + "ready.time.interval": "10s", + "ready.time.delay": "0" + }, + "time.range": ["-2m", "0"] + }, + "match.once": false + }, + + "evaluateRule": { + "sampleRatio": 0.2, + "rules": "$source.json().name = $target.json().name AND $source.json().age = $target.json().age" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-streaming2.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-streaming2.json b/measure/src/test/resources/config-streaming2.json new file mode 100644 index 0000000..061382b --- /dev/null +++ b/measure/src/test/resources/config-streaming2.json @@ -0,0 +1,65 @@ +{ + "name": "accu1", + "type": "accuracy", + + "process.type": "streaming", + + "source": { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "cache": { + "type": "parquet", + "config": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0" + }, + "time.range": ["-2m", "0"] + }, + "match.once": true + }, + + "target": { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "cache": { + "type": "parquet", + "config": { + "file.path": "hdfs://localhost/griffin/streaming/dump/target", + "info.path": "target", + "ready.time.interval": "10s", + "ready.time.delay": "0" + }, + "time.range": ["-2m", "0"] + }, + "match.once": false + }, + + "evaluateRule": { + "sampleRatio": 0.2, + "rules": "$source.json().name = $target.json().name AND $source.json().age = $target.json().age" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config-streaming3.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-streaming3.json b/measure/src/test/resources/config-streaming3.json new file mode 100644 index 0000000..fe3e56f --- /dev/null +++ b/measure/src/test/resources/config-streaming3.json @@ -0,0 +1,65 @@ +{ + "name": "accu1", + "type": "accuracy", + + "process.type": "streaming", + + "source": { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "cache": { + "type": "text", + "config": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0" + }, + "time.range": ["-2m", "0"] + }, + "match.once": true + }, + + "target": { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "cache": { + "type": "text", + "config": { + "file.path": "hdfs://localhost/griffin/streaming/dump/target", + "info.path": "target", + "ready.time.interval": "10s", + "ready.time.delay": "0" + }, + "time.range": ["-2m", "0"] + }, + "match.once": false + }, + + "evaluateRule": { + "sampleRatio": 0.2, + "rules": "$source.json().name = $target.json().name AND $source.json().age = $target.json().age" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config.json b/measure/src/test/resources/config.json index 65e0ed9..08a6021 100644 --- a/measure/src/test/resources/config.json +++ b/measure/src/test/resources/config.json @@ -2,6 +2,8 @@ "name": "accu1", "type": "accuracy", + "process.type": "batch", + "source": { "type": "avro", "version": "1.7", @@ -20,6 +22,6 @@ "evaluateRule": { "sampleRatio": 1, - "rules": "$source.user_id > 10020 AND $source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code" + "rules": "$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code WHEN $source.user_id > 10015" } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/config1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config1.json b/measure/src/test/resources/config1.json index d7290ba..16c265d 100644 --- a/measure/src/test/resources/config1.json +++ b/measure/src/test/resources/config1.json @@ -2,6 +2,8 @@ "name": "accu-test", "type": "accuracy", + "process.type": "batch", + "source": { "type": "hive", "version": "1.2", http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/env-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json new file mode 100644 index 0000000..42b4aa9 --- /dev/null +++ b/measure/src/test/resources/env-streaming.json @@ -0,0 +1,45 @@ +{ + "spark": { + "log.level": "WARN", + "checkpoint.dir": "hdfs://localhost/test/griffin/cp", + "batch.interval": "2s", + "process.interval": "10s", + "config": { + "spark.task.maxFailures": 5, + "spark.streaming.kafkaMaxRatePerPartition": 1000, + "spark.streaming.concurrentJobs": 4, + "spark.yarn.maxAppAttempts": 5, + "spark.yarn.am.attemptFailuresValidityInterval": "1h", + "spark.yarn.max.executor.failures": 120, + "spark.yarn.executor.failuresValidityInterval": "1h", + "spark.hadoop.fs.hdfs.impl.disable.cache": true + } + }, + + "persist": [ + { + "type": "log", + "config": { + "max.log.lines": 100 + } + } + ], + + "info.cache": [ + { + "type": "zk", + "config": { + "hosts": "localhost:2181", + "namespace": "griffin/infocache", + "lock.path": "lock", + "mode": "persist", + "init.clear": true, + "close.clear": false + } + } + ], + + "cleaner": { + "clean.interval": "2m" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/resources/env.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env.json b/measure/src/test/resources/env.json index 3a9e38c..14e3b75 100644 --- a/measure/src/test/resources/env.json +++ b/measure/src/test/resources/env.json @@ -1,7 +1,9 @@ { "spark": { - "log.level": "ERROR", + "log.level": "INFO", "checkpoint.dir": "hdfs:///griffin/batch/cp", + "batch.interval": "10s", + "process.interval": "10m", "config": {} }, @@ -9,14 +11,30 @@ { "type": "hdfs", "config": { - "path": "hdfs:///griffin/streaming/persist" + "path": "hdfs:///griffin/streaming/persist", + "max.persist.lines": 10000, + "max.lines.per.file": 10000 } }, { "type": "http", "config": { "method": "post", - "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/" + "api": "http://HOSTNAME:9200/griffin/accuracy" + } + } + ], + + "info.cache": [ + { + "type": "zk", + "config": { + "hosts": "localhost:2181", + "namespace": "griffin/infocache", + "lock.path": "lock", + "mode": "persist", + "init.clear": true, + "close.clear": false } } ], http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala new file mode 100644 index 0000000..6a60326 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala @@ -0,0 +1,198 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.algo +// +//import java.util.Date +// +//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo +//import org.apache.griffin.measure.config.params._ +//import org.apache.griffin.measure.config.params.env._ +//import org.apache.griffin.measure.config.params.user._ +//import org.apache.griffin.measure.config.reader._ +//import org.apache.griffin.measure.config.validator._ +//import org.apache.griffin.measure.connector.direct.DirectDataConnector +//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory} +//import org.apache.griffin.measure.log.Loggable +//import org.apache.griffin.measure.rule.expr._ +//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory} +//import org.apache.spark.rdd.RDD +//import org.apache.spark.sql.SQLContext +//import org.apache.spark.{SparkConf, SparkContext} +//import org.junit.runner.RunWith +//import org.scalatest.junit.JUnitRunner +//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +// +//import scala.util.{Failure, Success, Try} +// +// +//@RunWith(classOf[JUnitRunner]) +//class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { +// +// val envFile = "src/test/resources/env.json" +// val confFile = "src/test/resources/config.json" +//// val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}" +// val envFsType = "local" +// val userFsType = "local" +// +// val args = Array(envFile, confFile) +// +// var sc: SparkContext = _ +// var sqlContext: SQLContext = _ +// +// var allParam: AllParam = _ +// +// before { +// // read param files +// val envParam = readParamFile[EnvParam](envFile, envFsType) match { +// case Success(p) => p +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-2) +// } +// } +// val userParam = readParamFile[UserParam](confFile, userFsType) match { +// case Success(p) => p +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-2) +// } +// } +// allParam = AllParam(envParam, userParam) +// +// // validate param files +// validateParams(allParam) match { +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-3) +// } +// case _ => { +// info("params validation pass") +// } +// } +// +// val metricName = userParam.name +// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) +// sc = new SparkContext(conf) +// sqlContext = new SQLContext(sc) +// } +// +// test("algorithm") { +// Try { +// val envParam = allParam.envParam +// val userParam = allParam.userParam +// +// // start time +// val startTime = new Date().getTime() +// +// // get spark application id +// val applicationId = sc.applicationId +// +// // rules +// val ruleFactory = RuleFactory(userParam.evaluateRuleParam) +// val rule: StatementExpr = ruleFactory.generateRule() +// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) +// +// ruleAnalyzer.constCacheExprs.foreach(println) +// ruleAnalyzer.constFinalCacheExprs.foreach(println) +// +// // global cache data +// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]()) +// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap) +// val finalConstMap = finalConstExprValueMap.headOption match { +// case Some(m) => m +// case _ => Map[String, Any]() +// } +// +// // data connector +// val sourceDataConnector: DirectDataConnector = +// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam, +// ruleAnalyzer.sourceRuleExprs, finalConstMap +// ) match { +// case Success(cntr) => { +// if (cntr.available) cntr +// else throw new Exception("source data not available!") +// } +// case Failure(ex) => throw ex +// } +// val targetDataConnector: DirectDataConnector = +// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.targetParam, +// ruleAnalyzer.targetRuleExprs, finalConstMap +// ) match { +// case Success(cntr) => { +// if (cntr.available) cntr +// else throw new Exception("target data not available!") +// } +// case Failure(ex) => throw ex +// } +// +// // get metadata +//// val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match { +//// case Success(md) => md +//// case Failure(ex) => throw ex +//// } +//// val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match { +//// case Success(md) => md +//// case Failure(ex) => throw ex +//// } +// +// // get data +// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match { +// case Success(dt) => dt +// case Failure(ex) => throw ex +// } +// val targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetDataConnector.data() match { +// case Success(dt) => dt +// case Failure(ex) => throw ex +// } +// +// // my algo +// val algo = BatchAccuracyAlgo(allParam) +// +// // accuracy algorithm +// val (accuResult, missingRdd, matchedRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer) +// +// println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}") +// +// missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs)).foreach(println) +// +// // end time +// val endTime = new Date().getTime +// println(s"using time: ${endTime - startTime} ms") +// } match { +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-4) +// } +// case _ => { +// info("calculation finished") +// } +// } +// } +// +// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { +// val paramReader = ParamReaderFactory.getParamReader(file, fsType) +// paramReader.readConfig[T] +// } +// +// private def validateParams(allParam: AllParam): Try[Boolean] = { +// val allParamValidator = AllParamValidator() +// allParamValidator.validate(allParam) +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala new file mode 100644 index 0000000..e0f500a --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala @@ -0,0 +1,173 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.algo +// +//import java.util.Date +// +//import org.apache.griffin.measure.algo.batch.BatchProfileAlgo +//import org.apache.griffin.measure.config.params._ +//import org.apache.griffin.measure.config.params.env._ +//import org.apache.griffin.measure.config.params.user._ +//import org.apache.griffin.measure.config.reader._ +//import org.apache.griffin.measure.config.validator._ +//import org.apache.griffin.measure.connector.direct.DirectDataConnector +//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory} +//import org.apache.griffin.measure.log.Loggable +//import org.apache.griffin.measure.rule.expr._ +//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory} +//import org.apache.spark.rdd.RDD +//import org.apache.spark.sql.SQLContext +//import org.apache.spark.{SparkConf, SparkContext} +//import org.junit.runner.RunWith +//import org.scalatest.junit.JUnitRunner +//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +// +//import scala.util.{Failure, Success, Try} +// +// +//@RunWith(classOf[JUnitRunner]) +//class BatchProfileAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { +// +// val envFile = "src/test/resources/env.json" +// val confFile = "src/test/resources/config-profile.json" +// val envFsType = "local" +// val userFsType = "local" +// +// val args = Array(envFile, confFile) +// +// var sc: SparkContext = _ +// var sqlContext: SQLContext = _ +// +// var allParam: AllParam = _ +// +// before { +// // read param files +// val envParam = readParamFile[EnvParam](envFile, envFsType) match { +// case Success(p) => p +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-2) +// } +// } +// val userParam = readParamFile[UserParam](confFile, userFsType) match { +// case Success(p) => p +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-2) +// } +// } +// allParam = AllParam(envParam, userParam) +// +// // validate param files +// validateParams(allParam) match { +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-3) +// } +// case _ => { +// info("params validation pass") +// } +// } +// +// val metricName = userParam.name +// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) +// sc = new SparkContext(conf) +// sqlContext = new SQLContext(sc) +// } +// +// test("algorithm") { +// Try { +// val envParam = allParam.envParam +// val userParam = allParam.userParam +// +// // start time +// val startTime = new Date().getTime() +// +// // get spark application id +// val applicationId = sc.applicationId +// +// // rules +// val ruleFactory = RuleFactory(userParam.evaluateRuleParam) +// val rule: StatementExpr = ruleFactory.generateRule() +// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) +// +// ruleAnalyzer.constCacheExprs.foreach(println) +// ruleAnalyzer.constFinalCacheExprs.foreach(println) +// +// // global cache data +// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]()) +// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap) +// val finalConstMap = finalConstExprValueMap.headOption match { +// case Some(m) => m +// case _ => Map[String, Any]() +// } +// +// // data connector +// val sourceDataConnector: DirectDataConnector = +// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam, +// ruleAnalyzer.sourceRuleExprs, finalConstMap +// ) match { +// case Success(cntr) => { +// if (cntr.available) cntr +// else throw new Exception("source data not available!") +// } +// case Failure(ex) => throw ex +// } +// +// // get data +// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match { +// case Success(dt) => dt +// case Failure(ex) => throw ex +// } +// +// // my algo +// val algo = BatchProfileAlgo(allParam) +// +// // profile algorithm +// val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer) +// +// println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}") +// +// matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println) +// +// // end time +// val endTime = new Date().getTime +// println(s"using time: ${endTime - startTime} ms") +// } match { +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-4) +// } +// case _ => { +// info("calculation finished") +// } +// } +// } +// +// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { +// val paramReader = ParamReaderFactory.getParamReader(file, fsType) +// paramReader.readConfig[T] +// } +// +// private def validateParams(allParam: AllParam): Try[Boolean] = { +// val allParamValidator = AllParamValidator() +// allParamValidator.validate(allParam) +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala new file mode 100644 index 0000000..a76712f --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala @@ -0,0 +1,172 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.algo.batch +// +//import java.util.Date +// +//import org.apache.griffin.measure.config.params._ +//import org.apache.griffin.measure.config.params.env._ +//import org.apache.griffin.measure.config.params.user._ +//import org.apache.griffin.measure.config.reader._ +//import org.apache.griffin.measure.config.validator._ +//import org.apache.griffin.measure.connector.DataConnectorFactory +//import org.apache.griffin.measure.connector.direct.DirectDataConnector +//import org.apache.griffin.measure.log.Loggable +//import org.apache.griffin.measure.rule.expr._ +//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory} +//import org.apache.spark.rdd.RDD +//import org.apache.spark.sql.SQLContext +//import org.apache.spark.{SparkConf, SparkContext} +//import org.junit.runner.RunWith +//import org.scalatest.junit.JUnitRunner +//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +// +//import scala.util.{Failure, Success, Try} +// +// +//@RunWith(classOf[JUnitRunner]) +//class DataFrameSaveTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { +// +// val envFile = "src/test/resources/env.json" +// val confFile = "src/test/resources/config-profile.json" +// val envFsType = "local" +// val userFsType = "local" +// +// val args = Array(envFile, confFile) +// +// var sc: SparkContext = _ +// var sqlContext: SQLContext = _ +// +// var allParam: AllParam = _ +// +// before { +// // read param files +// val envParam = readParamFile[EnvParam](envFile, envFsType) match { +// case Success(p) => p +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-2) +// } +// } +// val userParam = readParamFile[UserParam](confFile, userFsType) match { +// case Success(p) => p +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-2) +// } +// } +// allParam = AllParam(envParam, userParam) +// +// // validate param files +// validateParams(allParam) match { +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-3) +// } +// case _ => { +// info("params validation pass") +// } +// } +// +// val metricName = userParam.name +// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) +// sc = new SparkContext(conf) +// sqlContext = new SQLContext(sc) +// } +// +// test("algorithm") { +// Try { +// val envParam = allParam.envParam +// val userParam = allParam.userParam +// +// // start time +// val startTime = new Date().getTime() +// +// // get spark application id +// val applicationId = sc.applicationId +// +// // rules +// val ruleFactory = RuleFactory(userParam.evaluateRuleParam) +// val rule: StatementExpr = ruleFactory.generateRule() +// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) +// +// ruleAnalyzer.constCacheExprs.foreach(println) +// ruleAnalyzer.constFinalCacheExprs.foreach(println) +// +// // global cache data +// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]()) +// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap) +// val finalConstMap = finalConstExprValueMap.headOption match { +// case Some(m) => m +// case _ => Map[String, Any]() +// } +// +// // data connector +// val sourceDataConnector: DirectDataConnector = +// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam, +// ruleAnalyzer.sourceRuleExprs, finalConstMap +// ) match { +// case Success(cntr) => { +// if (cntr.available) cntr +// else throw new Exception("source data not available!") +// } +// case Failure(ex) => throw ex +// } +// +// // get data +// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match { +// case Success(dt) => dt +// case Failure(ex) => throw ex +// } +// +// // my algo +// val algo = BatchProfileAlgo(allParam) +// +// // profile algorithm +// val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer) +// +// println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}") +// +// matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println) +// +// // end time +// val endTime = new Date().getTime +// println(s"using time: ${endTime - startTime} ms") +// } match { +// case Failure(ex) => { +// error(ex.getMessage) +// sys.exit(-4) +// } +// case _ => { +// info("calculation finished") +// } +// } +// } +// +// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { +// val paramReader = ParamReaderFactory.getParamReader(file, fsType) +// paramReader.readConfig[T] +// } +// +// private def validateParams(allParam: AllParam): Try[Boolean] = { +// val allParamValidator = AllParamValidator() +// allParamValidator.validate(allParam) +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala new file mode 100644 index 0000000..2179fba --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala @@ -0,0 +1,89 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.algo.core + +import org.apache.griffin.measure.config.params.user.EvaluateRuleParam +import org.apache.griffin.measure.rule.expr._ +import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.PrivateMethodTester + +@RunWith(classOf[JUnitRunner]) +class AccuracyCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { + + def findExprId(exprs: Iterable[Expr], desc: String): String = { + exprs.find(_.desc == desc) match { + case Some(expr) => expr._id + case _ => "" + } + } + + test ("match data success") { + val rule = "$source.name = $target.name AND $source.age < $target.age" + val evaluateRuleParam = EvaluateRuleParam(1.0, rule) + val ruleFactory = RuleFactory(evaluateRuleParam) + val statement = ruleFactory.generateRule + val ruleAnalyzer = RuleAnalyzer(statement) + + val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs + val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs + + val source = (Map[String, Any]( + (findExprId(sourcePersistExprs, "$source['name']") -> "jack"), + (findExprId(sourcePersistExprs, "$source['age']") -> 26) + ), Map[String, Any]()) + val target = (Map[String, Any]( + (findExprId(targetPersistExprs, "$target['name']") -> "jack"), + (findExprId(targetPersistExprs, "$target['age']") -> 27) + ), Map[String, Any]()) + + val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData) + val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer) + result._1 should be (true) + result._2.size should be (0) + } + + test ("match data fail") { + val rule = "$source.name = $target.name AND $source.age = $target.age" + val evaluateRuleParam = EvaluateRuleParam(1.0, rule) + val ruleFactory = RuleFactory(evaluateRuleParam) + val statement = ruleFactory.generateRule + val ruleAnalyzer = RuleAnalyzer(statement) + + val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs + val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs + + val source = (Map[String, Any]( + (findExprId(sourcePersistExprs, "$source['name']") -> "jack"), + (findExprId(sourcePersistExprs, "$source['age']") -> 26) + ), Map[String, Any]()) + val target = (Map[String, Any]( + (findExprId(targetPersistExprs, "$target['name']") -> "jack"), + (findExprId(targetPersistExprs, "$target['age']") -> 27) + ), Map[String, Any]()) + + val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData) + val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer) + result._1 should be (false) + result._2.size shouldNot be (0) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala new file mode 100644 index 0000000..087e8e5 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala @@ -0,0 +1,79 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.algo.core + +import org.apache.griffin.measure.config.params.user.EvaluateRuleParam +import org.apache.griffin.measure.rule.expr._ +import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.PrivateMethodTester + +@RunWith(classOf[JUnitRunner]) +class ProfileCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { + + def findExprId(exprs: Iterable[Expr], desc: String): String = { + exprs.find(_.desc == desc) match { + case Some(expr) => expr._id + case _ => "" + } + } + + test ("match data success") { + val rule = "$source.name = 'jack' AND $source.age = null" + val evaluateRuleParam = EvaluateRuleParam(1.0, rule) + val ruleFactory = RuleFactory(evaluateRuleParam) + val statement = ruleFactory.generateRule + val ruleAnalyzer = RuleAnalyzer(statement) + + val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs + + val source = (Map[String, Any]( + (findExprId(sourcePersistExprs, "$source['name']") -> "jack"), + (findExprId(sourcePersistExprs, "$source['age']") -> null) + ), Map[String, Any]()) + + val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData) + val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer) + result._1 should be (true) + result._2.size should be (0) + } + + test ("match data fail") { + val rule = "$source.name = 'jack' AND $source.age != null" + val evaluateRuleParam = EvaluateRuleParam(1.0, rule) + val ruleFactory = RuleFactory(evaluateRuleParam) + val statement = ruleFactory.generateRule + val ruleAnalyzer = RuleAnalyzer(statement) + + val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs + + val source = (Map[String, Any]( + (findExprId(sourcePersistExprs, "$source['name']") -> "jack"), + (findExprId(sourcePersistExprs, "$source['age']") -> null) + ), Map[String, Any]()) + + val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData) + val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer) + result._1 should be (false) + result._2.size shouldNot be (0) + } + +}