integrate chi2 and SNR into hivemall.spark
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/a1f8f958 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/a1f8f958 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/a1f8f958 Branch: refs/heads/JIRA-22/pr-385 Commit: a1f8f958c99f3cde9e48b6d80d364004f6d98cc2 Parents: 22a608e Author: amaya <g...@sapphire.in.net> Authored: Tue Sep 27 15:58:33 2016 +0900 Committer: amaya <g...@sapphire.in.net> Committed: Tue Sep 27 15:58:33 2016 +0900 ---------------------------------------------------------------------- .../apache/spark/sql/hive/GroupedDataEx.scala | 24 ++++++++ .../org/apache/spark/sql/hive/HivemallOps.scala | 19 ++++++ .../spark/sql/hive/HivemallOpsSuite.scala | 63 ++++++++++++++++++- .../org/apache/spark/sql/hive/HivemallOps.scala | 20 ++++++ .../sql/hive/RelationalGroupedDatasetEx.scala | 26 ++++++++ .../spark/sql/hive/HivemallOpsSuite.scala | 65 +++++++++++++++++++- 6 files changed, 212 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala index 37d5423..2482c62 100644 --- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala +++ b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/GroupedDataEx.scala @@ -264,4 +264,28 @@ final class GroupedDataEx protected[sql]( .toAggregateExpression() toDF((Alias(udaf, udaf.prettyString)() :: Nil).toSeq) } + + /** + * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF + */ + def snr(X: String, Y: String): DataFrame = { + val udaf = HiveUDAFFunction( + new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"), + Seq(X, Y).map(df.col(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Seq(Alias(udaf, udaf.prettyString)())) + } + + /** + * @see hivemall.tools.matrix.TransposeAndDotUDAF + */ + def transpose_and_dot(X: String, Y: String): DataFrame = { + val udaf = HiveUDAFFunction( + new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"), + Seq(X, Y).map(df.col(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Seq(Alias(udaf, udaf.prettyString)())) + } } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala index 133f1d5..5970b83 100644 --- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -1006,6 +1006,15 @@ object HivemallOps { } /** + * @see hivemall.ftvec.selection.ChiSquareUDF + * @group ftvec.selection + */ + def chi2(exprs: Column*): Column = { + HiveGenericUDF(new HiveFunctionWrapper( + "hivemall.ftvec.selection.ChiSquareUDF"), exprs.map(_.expr)) + } + + /** * @see hivemall.ftvec.conv.ToDenseFeaturesUDF * @group ftvec.conv */ @@ -1078,6 +1087,16 @@ object HivemallOps { } /** + * @see hivemall.tools.array.SelectKBestUDF + * @group tools.array + */ + @scala.annotation.varargs + def select_k_best(exprs: Column*): Column = { + HiveGenericUDF(new HiveFunctionWrapper( + "hivemall.tools.array.SelectKBestUDF"), exprs.map(_.expr)) + } + + /** * @see hivemall.tools.math.SigmoidUDF * @group misc */ http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala index 4be1e5e..148e5a2 100644 --- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala +++ b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive -import scala.collection.mutable.Seq - import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.hive.HivemallOps._ import org.apache.spark.sql.hive.HivemallUtils._ @@ -188,6 +186,22 @@ final class HivemallOpsSuite extends HivemallQueryTest { Row(Seq("1:1.0")))) } + test("ftvec.selection - chi2") { + import hiveContext.implicits._ + + val df = Seq(Seq( + Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996), + Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3), + Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)) -> Seq( + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))).toDF("arg0", "arg1") + + assert(df.select(chi2(df("arg0"), df("arg1"))).collect.toSet === + Set(Row(Row(Seq(10.817820878493995, 3.5944990176817315, 116.16984746363957, 67.24482558215503), + Seq(0.004476514990225833, 0.16575416718561453, 0d, 2.55351295663786e-15))))) + } + test("ftvec.conv - quantify") { import hiveContext.implicits._ val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", false)).toDF @@ -340,6 +354,18 @@ final class HivemallOpsSuite extends HivemallQueryTest { checkAnswer(predicted, Seq(Row(0), Row(1))) } + test("tools.array - select_k_best") { + import hiveContext.implicits._ + + val data = Seq(Tuple1(Seq(0, 1, 3)), Tuple1(Seq(2, 4, 1)), Tuple1(Seq(5, 4, 9))) + val importance = Seq(3, 1, 2) + val k = 2 + val df = data.toDF("features") + + assert(df.select(select_k_best(df("features"), importance, k)).collect.toSeq === + data.map(s => Row(Seq(s._1(0).toDouble, s._1(2).toDouble)))) + } + test("misc - sigmoid") { import hiveContext.implicits._ /** @@ -536,4 +562,37 @@ final class HivemallOpsSuite extends HivemallQueryTest { val row4 = df4.groupby($"c0").f1score("c1", "c2").collect assert(row4(0).getDouble(1) ~== 0.25) } + + test("user-defined aggregators for ftvec.selection") { + import hiveContext.implicits._ + + // +-----------------+-------+ + // | features | class | + // +-----------------+-------+ + // | 5.1,3.5,1.4,0.2 | 0 | + // | 4.9,3.0,1.4,0.2 | 0 | + // | 7.0,3.2,4.7,1.4 | 1 | + // | 6.4,3.2,4.5,1.5 | 1 | + // | 6.3,3.3,6.0,2.5 | 2 | + // | 5.8,2.7,5.1,1.9 | 2 | + // +-----------------+-------+ + val df0 = Seq( + (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0, 0)), + (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1, 0)), + (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), Seq(0, 0, 1))) + .toDF.as("c0", "arg0", "arg1") + val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect + assert(row0(0).getAs[Seq[Double]](1) === + Seq(8.431818181818192, 1.3212121212121217, 42.94949494949499, 33.80952380952378)) + } + + test("user-defined aggregators for tools.matrix") { + import hiveContext.implicits._ + + // | 1 2 3 |T | 5 6 7 | + // | 3 4 5 | * | 7 8 9 | + val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9))).toDF.as("c0", "arg0", "arg1") + val row0 = df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect + assert(row0(0).getAs[Seq[Double]](1) === Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0))) + } } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala index 4a583db..e9a1aeb 100644 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -1228,6 +1228,16 @@ object HivemallOps { } /** + * @see hivemall.ftvec.selection.ChiSquareUDF + * @group ftvec.selection + */ + def chi2(exprs: Column*): Column = withExpr { + HiveGenericUDF("chi2", + new HiveFunctionWrapper("hivemall.ftvec.selection.ChiSquareUDF"), + exprs.map(_.expr)) + } + + /** * @see hivemall.ftvec.conv.ToDenseFeaturesUDF * @group ftvec.conv */ @@ -1307,6 +1317,16 @@ object HivemallOps { } /** + * @see hivemall.tools.array.SelectKBestUDF + * @group tools.array + */ + def select_k_best(exprs: Column*): Column = withExpr { + HiveGenericUDF("select_k_best", + new HiveFunctionWrapper("hivemall.tools.array.SelectKBestUDF"), + exprs.map(_.expr)) + } + + /** * @see hivemall.tools.math.SigmoidUDF * @group misc */ http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala index e365197..be0673f 100644 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala +++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/RelationalGroupedDatasetEx.scala @@ -274,4 +274,30 @@ final class RelationalGroupedDatasetEx protected[sql]( .toAggregateExpression() toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) } + + /** + * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF + */ + def snr(X: String, Y: String): DataFrame = { + val udaf = HiveUDAFFunction( + "snr", + new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"), + Seq(X, Y).map(df.col(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Seq(Alias(udaf, udaf.prettyName)())) + } + + /** + * @see hivemall.tools.matrix.TransposeAndDotUDAF + */ + def transpose_and_dot(X: String, Y: String): DataFrame = { + val udaf = HiveUDAFFunction( + "transpose_and_dot", + new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"), + Seq(X, Y).map(df.col(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Seq(Alias(udaf, udaf.prettyName)())) + } } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/a1f8f958/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala index 99cb1a7..039a492 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala @@ -17,9 +17,7 @@ package org.apache.spark.sql.hive -import scala.collection.mutable.Seq - -import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.{AnalysisException, Column, Row} import org.apache.spark.sql.functions import org.apache.spark.sql.hive.HivemallOps._ import org.apache.spark.sql.hive.HivemallUtils._ @@ -189,6 +187,22 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { Row(Seq("1:1.0")))) } + test("ftvec.selection - chi2") { + import hiveContext.implicits._ + + val df = Seq(Seq( + Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996), + Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3), + Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)) -> Seq( + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))).toDF("arg0", "arg1") + + assert(df.select(chi2(df("arg0"), df("arg1"))).collect.toSet === + Set(Row(Row(Seq(10.817820878493995, 3.5944990176817315, 116.16984746363957, 67.24482558215503), + Seq(0.004476514990225833, 0.16575416718561453, 0d, 2.55351295663786e-15))))) + } + test("ftvec.conv - quantify") { import hiveContext.implicits._ val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", false)).toDF @@ -342,6 +356,18 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { checkAnswer(predicted, Seq(Row(0), Row(1))) } + test("tools.array - select_k_best") { + import hiveContext.implicits._ + + val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9)) + val importance = Seq(3, 1, 2) + val k = 2 + val df = data.toDF("features") + + assert(df.select(select_k_best(df("features"), importance, k)).collect.toSeq === + data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble)))) + } + test("misc - sigmoid") { import hiveContext.implicits._ /** @@ -631,6 +657,39 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { val row4 = df4.groupby($"c0").f1score("c1", "c2").collect assert(row4(0).getDouble(1) ~== 0.25) } + + test("user-defined aggregators for ftvec.selection") { + import hiveContext.implicits._ + + // +-----------------+-------+ + // | features | class | + // +-----------------+-------+ + // | 5.1,3.5,1.4,0.2 | 0 | + // | 4.9,3.0,1.4,0.2 | 0 | + // | 7.0,3.2,4.7,1.4 | 1 | + // | 6.4,3.2,4.5,1.5 | 1 | + // | 6.3,3.3,6.0,2.5 | 2 | + // | 5.8,2.7,5.1,1.9 | 2 | + // +-----------------+-------+ + val df0 = Seq( + (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0, 0)), + (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1, 0)), + (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), Seq(0, 0, 1))) + .toDF.as("c0", "arg0", "arg1") + val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect + assert(row0(0).getAs[Seq[Double]](1) === + Seq(8.431818181818192, 1.3212121212121217, 42.94949494949499, 33.80952380952378)) + } + + test("user-defined aggregators for tools.matrix") { + import hiveContext.implicits._ + + // | 1 2 3 |T | 5 6 7 | + // | 3 4 5 | * | 7 8 9 | + val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9))).toDF.as("c0", "arg0", "arg1") + val row0 = df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect + assert(row0(0).getAs[Seq[Double]](1) === Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0))) + } } final class HivemallOpsWithVectorSuite extends VectorQueryTest {