incubator-hivemall git commit: Close #61: [HIVEMALL-88][SPARK] Support a function to flatten nested schemas

2017-03-09 Thread yamamuro
Repository: incubator-hivemall
Updated Branches:
  refs/heads/master 210b7765b -> f7fc3041f


Close #61: [HIVEMALL-88][SPARK] Support a function to flatten nested schemas


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/f7fc3041
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/f7fc3041
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/f7fc3041

Branch: refs/heads/master
Commit: f7fc3041fba258a578bf0bf4bd78d5422718777c
Parents: 210b776
Author: Takeshi Yamamuro 
Authored: Thu Mar 9 17:00:42 2017 +0900
Committer: Takeshi Yamamuro 
Committed: Thu Mar 9 17:00:42 2017 +0900

--
 docs/gitbook/SUMMARY.md |  1 +
 docs/gitbook/spark/misc/functions.md| 47 +++
 .../org/apache/spark/sql/hive/HivemallOps.scala | 60 
 .../spark/sql/hive/HivemallOpsSuite.scala   | 26 +
 4 files changed, 134 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/f7fc3041/docs/gitbook/SUMMARY.md
--
diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md
index 6840cac..4c6ed1b 100644
--- a/docs/gitbook/SUMMARY.md
+++ b/docs/gitbook/SUMMARY.md
@@ -163,6 +163,7 @@
 
 * [Generic features](spark/misc/misc.md)
 * [Top-k Join processing](spark/misc/topk_join.md)
+* [Other utility functions](spark/misc/functions.md)
 
 ## Part X - External References
 

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/f7fc3041/docs/gitbook/spark/misc/functions.md
--
diff --git a/docs/gitbook/spark/misc/functions.md 
b/docs/gitbook/spark/misc/functions.md
new file mode 100644
index 000..23763dd
--- /dev/null
+++ b/docs/gitbook/spark/misc/functions.md
@@ -0,0 +1,47 @@
+
+
+`df.flatten()` flattens a nested schema of `df` into a flat one.
+
+# Usage
+
+```scala
+scala> val df = Seq((0, (1, (3.0, "a")), (5, 0.9))).toDF()
+scala> df.printSchema
+root
+ |-- _1: integer (nullable = false)
+ |-- _2: struct (nullable = true)
+ ||-- _1: integer (nullable = false)
+ ||-- _2: struct (nullable = true)
+ |||-- _1: double (nullable = false)
+ |||-- _2: string (nullable = true)
+ |-- _3: struct (nullable = true)
+ ||-- _1: integer (nullable = false)
+ ||-- _2: double (nullable = false)
+
+scala> df.flatten(separator = "$").printSchema
+root
+ |-- _1: integer (nullable = false)
+ |-- _2$_1: integer (nullable = true)
+ |-- _2$_2$_1: double (nullable = true)
+ |-- _2$_2$_2: string (nullable = true)
+ |-- _3$_1: integer (nullable = true)
+ |-- _3$_2: double (nullable = true)
+```
+

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/f7fc3041/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
--
diff --git 
a/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala 
b/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
index 6883ac1..d7fa202 100644
--- a/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
+++ b/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
@@ -805,6 +805,66 @@ final class HivemallOps(df: DataFrame) extends Logging {
 JoinTopK(kInt, df.logicalPlan, right.logicalPlan, Inner, 
Option(joinExprs.expr))(score.named)
   }
 
+  private def doFlatten(schema: StructType, separator: Char, prefixParts: 
Seq[String] = Seq.empty)
+: Seq[Column] = {
+schema.fields.flatMap { f =>
+  val colNameParts = prefixParts :+ f.name
+  f.dataType match {
+case st: StructType =>
+  doFlatten(st, separator, colNameParts)
+case _ =>
+  
col(colNameParts.mkString(".")).as(colNameParts.mkString(separator.toString)) 
:: Nil
+  }
+}
+  }
+
+  // Converts string representation of a character to actual character
+  @throws[IllegalArgumentException]
+  private def toChar(str: String): Char = {
+if (str.length == 1) {
+  str.charAt(0) match {
+case '$' | '_' | '.' => str.charAt(0)
+case _ => throw new IllegalArgumentException(
+  "Must use '$', '_', or '.' for separator, but got " + str)
+  }
+} else {
+  throw new IllegalArgumentException(
+s"Separator cannot be more than one character: $str")
+}
+  }
+
+  /**
+   * Flattens a nested schema into a flat one.
+   * @group misc
+   *
+   * For example:
+   * {{{
+   *  scala> val df = Seq((0, (1, (3.0, "a")), (5, 0.9))).toDF()
+   *  scala> df.printSchema
+   *  root
+   *   |-- _1: integer (nullable = false)
+   *   |-- _2: struct 

incubator-hivemall git commit: Close #61: [HIVEMALL-88][SPARK] Support a function to flatten nested schemas

2017-03-08 Thread yamamuro
Repository: incubator-hivemall
Updated Branches:
  refs/heads/master 210b7765b -> 33baa2408


Close #61: [HIVEMALL-88][SPARK] Support a function to flatten nested schemas


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/33baa240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/33baa240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/33baa240

Branch: refs/heads/master
Commit: 33baa2408b77895a4feaaa0f60953055657275d0
Parents: 210b776
Author: Takeshi Yamamuro 
Authored: Thu Mar 9 16:53:00 2017 +0900
Committer: Takeshi Yamamuro 
Committed: Thu Mar 9 16:53:00 2017 +0900

--
 .../datasources/csv/csvExpressions.scala| 153 +++
 .../org/apache/spark/sql/hive/HivemallOps.scala |  48 ++
 .../spark/sql/hive/HivemallOpsSuite.scala   |  19 +++
 3 files changed, 220 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/33baa240/spark/spark-2.1/src/main/scala/org/apache/spark/sql/execution/datasources/csv/csvExpressions.scala
--
diff --git 
a/spark/spark-2.1/src/main/scala/org/apache/spark/sql/execution/datasources/csv/csvExpressions.scala
 
b/spark/spark-2.1/src/main/scala/org/apache/spark/sql/execution/datasources/csv/csvExpressions.scala
new file mode 100644
index 000..363d432
--- /dev/null
+++ 
b/spark/spark-2.1/src/main/scala/org/apache/spark/sql/execution/datasources/csv/csvExpressions.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.CharArrayWriter
+
+import jodd.util.CsvUtil
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a csv input string to a [[StructType]] with the specified schema.
+ *
+ * TODO: Move this class into org.apache.spark.sql.catalyst.expressions in 
Spark-v2.2+
+ */
+case class CsvToStruct(schema: StructType, options: Map[String, String], 
child: Expression)
+  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+  override def nullable: Boolean = true
+
+  @transient private val csvOptions = new CSVOptions(options)
+  @transient private val csvReader = new CsvReader(csvOptions)
+  @transient private val csvParser = CSVRelation.csvParser(schema, 
schema.fieldNames, csvOptions)
+
+  private def parse(s: String): InternalRow = {
+csvParser(csvReader.parseLine(s), 0).orNull
+  }
+
+  override def dataType: DataType = schema
+
+  override def nullSafeEval(csv: Any): Any = {
+try parse(csv.toString) catch { case _: RuntimeException => null }
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+}
+
+/**
+ * Converts a [[StructType]] to a csv output string.
+ */
+case class StructToCsv(
+options: Map[String, String],
+child: Expression)
+  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+  override def nullable: Boolean = true
+
+  @transient
+  lazy val params = new CSVOptions(options)
+
+  @transient
+  lazy val dataSchema = child.dataType.asInstanceOf[StructType]
+
+  @transient
+  lazy val writer = new LineCsvWriter(params, dataSchema.fieldNames.toSeq)
+
+  override def dataType: DataType = StringType
+
+  // A `ValueConverter` is responsible for converting a value of an 
`InternalRow` to `String`.
+  // When the value is null, this converter should not be called.
+  private type ValueConverter = (InternalRow, Int) => String
+
+  // `ValueConverter`s for all values in the fields of the schema
+  private lazy val valueConverters: