incubator-hivemall git commit: Close #61: [HIVEMALL-88][SPARK] Support a function to flatten nested schemas
Repository: incubator-hivemall Updated Branches: refs/heads/master 210b7765b -> f7fc3041f Close #61: [HIVEMALL-88][SPARK] Support a function to flatten nested schemas Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/f7fc3041 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/f7fc3041 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/f7fc3041 Branch: refs/heads/master Commit: f7fc3041fba258a578bf0bf4bd78d5422718777c Parents: 210b776 Author: Takeshi YamamuroAuthored: Thu Mar 9 17:00:42 2017 +0900 Committer: Takeshi Yamamuro Committed: Thu Mar 9 17:00:42 2017 +0900 -- docs/gitbook/SUMMARY.md | 1 + docs/gitbook/spark/misc/functions.md| 47 +++ .../org/apache/spark/sql/hive/HivemallOps.scala | 60 .../spark/sql/hive/HivemallOpsSuite.scala | 26 + 4 files changed, 134 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/f7fc3041/docs/gitbook/SUMMARY.md -- diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 6840cac..4c6ed1b 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -163,6 +163,7 @@ * [Generic features](spark/misc/misc.md) * [Top-k Join processing](spark/misc/topk_join.md) +* [Other utility functions](spark/misc/functions.md) ## Part X - External References http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/f7fc3041/docs/gitbook/spark/misc/functions.md -- diff --git a/docs/gitbook/spark/misc/functions.md b/docs/gitbook/spark/misc/functions.md new file mode 100644 index 000..23763dd --- /dev/null +++ b/docs/gitbook/spark/misc/functions.md @@ -0,0 +1,47 @@ + + +`df.flatten()` flattens a nested schema of `df` into a flat one. + +# Usage + +```scala +scala> val df = Seq((0, (1, (3.0, "a")), (5, 0.9))).toDF() +scala> df.printSchema +root + |-- _1: integer (nullable = false) + |-- _2: struct (nullable = true) + ||-- _1: integer (nullable = false) + ||-- _2: struct (nullable = true) + |||-- _1: double (nullable = false) + |||-- _2: string (nullable = true) + |-- _3: struct (nullable = true) + ||-- _1: integer (nullable = false) + ||-- _2: double (nullable = false) + +scala> df.flatten(separator = "$").printSchema +root + |-- _1: integer (nullable = false) + |-- _2$_1: integer (nullable = true) + |-- _2$_2$_1: double (nullable = true) + |-- _2$_2$_2: string (nullable = true) + |-- _3$_1: integer (nullable = true) + |-- _3$_2: double (nullable = true) +``` + http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/f7fc3041/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala -- diff --git a/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala index 6883ac1..d7fa202 100644 --- a/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ b/spark/spark-2.1/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -805,6 +805,66 @@ final class HivemallOps(df: DataFrame) extends Logging { JoinTopK(kInt, df.logicalPlan, right.logicalPlan, Inner, Option(joinExprs.expr))(score.named) } + private def doFlatten(schema: StructType, separator: Char, prefixParts: Seq[String] = Seq.empty) +: Seq[Column] = { +schema.fields.flatMap { f => + val colNameParts = prefixParts :+ f.name + f.dataType match { +case st: StructType => + doFlatten(st, separator, colNameParts) +case _ => + col(colNameParts.mkString(".")).as(colNameParts.mkString(separator.toString)) :: Nil + } +} + } + + // Converts string representation of a character to actual character + @throws[IllegalArgumentException] + private def toChar(str: String): Char = { +if (str.length == 1) { + str.charAt(0) match { +case '$' | '_' | '.' => str.charAt(0) +case _ => throw new IllegalArgumentException( + "Must use '$', '_', or '.' for separator, but got " + str) + } +} else { + throw new IllegalArgumentException( +s"Separator cannot be more than one character: $str") +} + } + + /** + * Flattens a nested schema into a flat one. + * @group misc + * + * For example: + * {{{ + * scala> val df = Seq((0, (1, (3.0, "a")), (5, 0.9))).toDF() + * scala> df.printSchema + * root + * |-- _1: integer (nullable = false) + * |-- _2: struct
incubator-hivemall git commit: Close #61: [HIVEMALL-88][SPARK] Support a function to flatten nested schemas
Repository: incubator-hivemall Updated Branches: refs/heads/master 210b7765b -> 33baa2408 Close #61: [HIVEMALL-88][SPARK] Support a function to flatten nested schemas Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/33baa240 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/33baa240 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/33baa240 Branch: refs/heads/master Commit: 33baa2408b77895a4feaaa0f60953055657275d0 Parents: 210b776 Author: Takeshi YamamuroAuthored: Thu Mar 9 16:53:00 2017 +0900 Committer: Takeshi Yamamuro Committed: Thu Mar 9 16:53:00 2017 +0900 -- .../datasources/csv/csvExpressions.scala| 153 +++ .../org/apache/spark/sql/hive/HivemallOps.scala | 48 ++ .../spark/sql/hive/HivemallOpsSuite.scala | 19 +++ 3 files changed, 220 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/33baa240/spark/spark-2.1/src/main/scala/org/apache/spark/sql/execution/datasources/csv/csvExpressions.scala -- diff --git a/spark/spark-2.1/src/main/scala/org/apache/spark/sql/execution/datasources/csv/csvExpressions.scala b/spark/spark-2.1/src/main/scala/org/apache/spark/sql/execution/datasources/csv/csvExpressions.scala new file mode 100644 index 000..363d432 --- /dev/null +++ b/spark/spark-2.1/src/main/scala/org/apache/spark/sql/execution/datasources/csv/csvExpressions.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.io.CharArrayWriter + +import jodd.util.CsvUtil + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a csv input string to a [[StructType]] with the specified schema. + * + * TODO: Move this class into org.apache.spark.sql.catalyst.expressions in Spark-v2.2+ + */ +case class CsvToStruct(schema: StructType, options: Map[String, String], child: Expression) + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { + override def nullable: Boolean = true + + @transient private val csvOptions = new CSVOptions(options) + @transient private val csvReader = new CsvReader(csvOptions) + @transient private val csvParser = CSVRelation.csvParser(schema, schema.fieldNames, csvOptions) + + private def parse(s: String): InternalRow = { +csvParser(csvReader.parseLine(s), 0).orNull + } + + override def dataType: DataType = schema + + override def nullSafeEval(csv: Any): Any = { +try parse(csv.toString) catch { case _: RuntimeException => null } + } + + override def inputTypes: Seq[AbstractDataType] = StringType :: Nil +} + +/** + * Converts a [[StructType]] to a csv output string. + */ +case class StructToCsv( +options: Map[String, String], +child: Expression) + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { + override def nullable: Boolean = true + + @transient + lazy val params = new CSVOptions(options) + + @transient + lazy val dataSchema = child.dataType.asInstanceOf[StructType] + + @transient + lazy val writer = new LineCsvWriter(params, dataSchema.fieldNames.toSeq) + + override def dataType: DataType = StringType + + // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`. + // When the value is null, this converter should not be called. + private type ValueConverter = (InternalRow, Int) => String + + // `ValueConverter`s for all values in the fields of the schema + private lazy val valueConverters: