spark git commit: [SPARK-16286][SQL] Implement stack table generating function

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e32c29d86 -> 565e18cf7


[SPARK-16286][SQL] Implement stack table generating function

This PR implements `stack` table generating function.

Pass the Jenkins tests including new testcases.

Author: Dongjoon Hyun 

Closes #14033 from dongjoon-hyun/SPARK-16286.

(cherry picked from commit d0d28507cacfca5919dbfb4269892d58b62e8662)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/565e18cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/565e18cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/565e18cf

Branch: refs/heads/branch-2.0
Commit: 565e18cf7670231b1fa9db84f907654da79e6cef
Parents: e32c29d
Author: Dongjoon Hyun 
Authored: Wed Jul 6 10:54:43 2016 +0800
Committer: Reynold Xin 
Committed: Thu Jul 7 21:09:09 2016 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/generators.scala   | 53 
 .../expressions/GeneratorExpressionSuite.scala  | 18 +++
 .../spark/sql/GeneratorFunctionSuite.scala  | 53 
 .../spark/sql/hive/HiveSessionCatalog.scala |  2 +-
 5 files changed, 126 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/565e18cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 021bec7..f6ebcae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -182,6 +182,7 @@ object FunctionRegistry {
 expression[PosExplode]("posexplode"),
 expression[Rand]("rand"),
 expression[Randn]("randn"),
+expression[Stack]("stack"),
 expression[CreateStruct]("struct"),
 expression[CaseWhen]("when"),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/565e18cf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 99b97c8..9d5c856 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -94,6 +94,59 @@ case class UserDefinedGenerator(
 }
 
 /**
+ * Separate v1, ..., vk into n rows. Each row will have k/n columns. n must be 
constant.
+ * {{{
+ *   SELECT stack(2, 1, 2, 3) ->
+ *   1  2
+ *   3  NULL
+ * }}}
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.",
+  extended = "> SELECT _FUNC_(2, 1, 2, 3);\n  [1,2]\n  [3,null]")
+case class Stack(children: Seq[Expression])
+extends Expression with Generator with CodegenFallback {
+
+  private lazy val numRows = children.head.eval().asInstanceOf[Int]
+  private lazy val numFields = Math.ceil((children.length - 1.0) / 
numRows).toInt
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length <= 1) {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least 2 
arguments.")
+} else if (children.head.dataType != IntegerType || 
!children.head.foldable || numRows < 1) {
+  TypeCheckResult.TypeCheckFailure("The number of rows must be a positive 
constant integer.")
+} else {
+  for (i <- 1 until children.length) {
+val j = (i - 1) % numFields
+if (children(i).dataType != elementSchema.fields(j).dataType) {
+  return TypeCheckResult.TypeCheckFailure(
+s"Argument ${j + 1} (${elementSchema.fields(j).dataType}) != " +
+  s"Argument $i (${children(i).dataType})")
+}
+  }
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def elementSchema: StructType =
+StructType(children.tail.take(numFields).zipWithIndex.map {
+  case (e, index) => StructField(s"col$index", e.dataType)
+})
+
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+val values = children.tail.map(_.eval(input)).toArray
+for (row <- 0 until numRows) yield {
+  val fields = new Array[Any](numFields)
+  for (col <- 0 until numFields) {
+val index = row * 

spark git commit: [SPARK-16288][SQL] Implement inline table generating function

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 bb4b0419b -> e32c29d86


[SPARK-16288][SQL] Implement inline table generating function

This PR implements `inline` table generating function.

Pass the Jenkins tests with new testcase.

Author: Dongjoon Hyun 

Closes #13976 from dongjoon-hyun/SPARK-16288.

(cherry picked from commit 88134e736829f5f93a82879c08cb191f175ff8af)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e32c29d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e32c29d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e32c29d8

Branch: refs/heads/branch-2.0
Commit: e32c29d86d4cc7ebe8e485c4221b5a10366b3d7d
Parents: bb4b041
Author: Dongjoon Hyun 
Authored: Mon Jul 4 01:57:45 2016 +0800
Committer: Reynold Xin 
Committed: Thu Jul 7 21:08:45 2016 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/generators.scala   | 35 
 .../expressions/GeneratorExpressionSuite.scala  | 59 +--
 .../spark/sql/GeneratorFunctionSuite.scala  | 60 
 .../spark/sql/hive/HiveSessionCatalog.scala |  5 +-
 5 files changed, 124 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e7f335f..021bec7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -165,6 +165,7 @@ object FunctionRegistry {
 expression[Explode]("explode"),
 expression[Greatest]("greatest"),
 expression[If]("if"),
+expression[Inline]("inline"),
 expression[IsNaN]("isnan"),
 expression[IfNull]("ifnull"),
 expression[IsNull]("isnull"),

http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 4e91cc5..99b97c8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -195,3 +195,38 @@ case class Explode(child: Expression) extends 
ExplodeBase(child, position = fals
   extended = "> SELECT _FUNC_(array(10,20));\n  0\t10\n  1\t20")
 // scalastyle:on line.size.limit
 case class PosExplode(child: Expression) extends ExplodeBase(child, position = 
true)
+
+/**
+ * Explodes an array of structs into a table.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(a) - Explodes an array of structs into a table.",
+  extended = "> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));\n  
[1,a]\n  [2,b]")
+case class Inline(child: Expression) extends UnaryExpression with Generator 
with CodegenFallback {
+
+  override def children: Seq[Expression] = child :: Nil
+
+  override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
+case ArrayType(et, _) if et.isInstanceOf[StructType] =>
+  TypeCheckResult.TypeCheckSuccess
+case _ =>
+  TypeCheckResult.TypeCheckFailure(
+s"input to function $prettyName should be array of struct type, not 
${child.dataType}")
+  }
+
+  override def elementSchema: StructType = child.dataType match {
+case ArrayType(et : StructType, _) => et
+  }
+
+  private lazy val numFields = elementSchema.fields.length
+
+  override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+val inputArray = child.eval(input).asInstanceOf[ArrayData]
+if (inputArray == null) {
+  Nil
+} else {
+  for (i <- 0 until inputArray.numElements())
+yield inputArray.getStruct(i, numFields)
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e32c29d8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratorExpressionSuite.scala
 

spark git commit: [SPARK-16274][SQL] Implement xpath_boolean

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 144aa84ce -> bb4b0419b


[SPARK-16274][SQL] Implement xpath_boolean

This patch implements xpath_boolean expression for Spark SQL, a xpath function 
that returns true or false. The implementation is modelled after Hive's 
xpath_boolean, except that how the expression handles null inputs. Hive throws 
a NullPointerException at runtime if either of the input is null. This 
implementation returns null if either of the input is null.

Created two new test suites. One for unit tests covering the expression, and 
the other for end-to-end test in SQL.

Author: petermaxlee 

Closes #13964 from petermaxlee/SPARK-16274.

(cherry picked from commit d3af6731fa270842818ed91d6b4d14708ddae2db)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb4b0419
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb4b0419
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb4b0419

Branch: refs/heads/branch-2.0
Commit: bb4b0419b1dcd2b1926a829488a5a1d1b43756e0
Parents: 144aa84
Author: petermaxlee 
Authored: Thu Jun 30 09:27:48 2016 +0800
Committer: Reynold Xin 
Committed: Thu Jul 7 21:07:33 2016 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  2 +
 .../catalyst/expressions/xml/XPathBoolean.scala | 58 +++
 .../expressions/xml/XPathExpressionSuite.scala  | 61 
 .../apache/spark/sql/XmlFunctionsSuite.scala| 32 ++
 .../spark/sql/hive/HiveSessionCatalog.scala |  2 +-
 5 files changed, 154 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bb4b0419/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 346cdd8..e7f335f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.xml._
 import org.apache.spark.sql.catalyst.util.StringKeyHashMap
 
 
@@ -305,6 +306,7 @@ object FunctionRegistry {
 expression[UnBase64]("unbase64"),
 expression[Unhex]("unhex"),
 expression[Upper]("upper"),
+expression[XPathBoolean]("xpath_boolean"),
 
 // datetime functions
 expression[AddMonths]("add_months"),

http://git-wip-us.apache.org/repos/asf/spark/blob/bb4b0419/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala
new file mode 100644
index 000..2a5256c
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType, 
StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+@ExpressionDescription(
+  usage = "_FUNC_(xml, xpath) - Evaluates a boolean xpath 

spark git commit: [SPARK-16271][SQL] Implement Hive's UDFXPathUtil

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a04975457 -> 144aa84ce


[SPARK-16271][SQL] Implement Hive's UDFXPathUtil

This patch ports Hive's UDFXPathUtil over to Spark, which can be used to 
implement xpath functionality in Spark in the near future.

Added two new test suites UDFXPathUtilSuite and ReusableStringReaderSuite. They 
have been ported over from Hive (but rewritten in Scala in order to leverage 
ScalaTest).

Author: petermaxlee 

Closes #13961 from petermaxlee/xpath.

(cherry picked from commit 153c2f9ac12846367a09684fd875c496d350a603)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/144aa84c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/144aa84c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/144aa84c

Branch: refs/heads/branch-2.0
Commit: 144aa84ce0f3463d95c06c78df6e9996ad42240a
Parents: a049754
Author: petermaxlee 
Authored: Tue Jun 28 21:07:52 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jul 7 21:07:03 2016 -0700

--
 .../catalyst/expressions/xml/UDFXPathUtil.java  | 192 +++
 .../xml/ReusableStringReaderSuite.scala | 103 ++
 .../expressions/xml/UDFXPathUtilSuite.scala |  99 ++
 3 files changed, 394 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/144aa84c/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
new file mode 100644
index 000..01a11f9
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.xml;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+import javax.xml.namespace.QName;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+/**
+ * Utility class for all XPath UDFs. Each UDF instance should keep an instance 
of this class.
+ *
+ * This is based on Hive's UDFXPathUtil implementation.
+ */
+public class UDFXPathUtil {
+  private XPath xpath = XPathFactory.newInstance().newXPath();
+  private ReusableStringReader reader = new ReusableStringReader();
+  private InputSource inputSource = new InputSource(reader);
+  private XPathExpression expression = null;
+  private String oldPath = null;
+
+  public Object eval(String xml, String path, QName qname) {
+if (xml == null || path == null || qname == null) {
+  return null;
+}
+
+if (xml.length() == 0 || path.length() == 0) {
+  return null;
+}
+
+if (!path.equals(oldPath)) {
+  try {
+expression = xpath.compile(path);
+  } catch (XPathExpressionException e) {
+expression = null;
+  }
+  oldPath = path;
+}
+
+if (expression == null) {
+  return null;
+}
+
+reader.set(xml);
+
+try {
+  return expression.evaluate(inputSource, qname);
+} catch (XPathExpressionException e) {
+  throw new RuntimeException ("Invalid expression '" + oldPath + "'", e);
+}
+  }
+
+  public Boolean evalBoolean(String xml, String path) {
+return (Boolean) eval(xml, path, XPathConstants.BOOLEAN);
+  }
+
+  public String evalString(String xml, String path) {
+return (String) eval(xml, path, XPathConstants.STRING);
+  }
+
+  public Double evalNumber(String xml, String path) {
+return (Double) eval(xml, path, 

spark git commit: [SPARK-16289][SQL] Implement posexplode table generating function

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 7ef1d1c61 -> a04975457


[SPARK-16289][SQL] Implement posexplode table generating function

This PR implements `posexplode` table generating function. Currently, master 
branch raises the following exception for `map` argument. It's different from 
Hive.

**Before**
```scala
scala> sql("select posexplode(map('a', 1, 'b', 2))").show
org.apache.spark.sql.AnalysisException: No handler for Hive UDF ... 
posexplode() takes an array as a parameter; line 1 pos 7
```

**After**
```scala
scala> sql("select posexplode(map('a', 1, 'b', 2))").show
+---+---+-+
|pos|key|value|
+---+---+-+
|  0|  a|1|
|  1|  b|2|
+---+---+-+
```

For `array` argument, `after` is the same with `before`.
```
scala> sql("select posexplode(array(1, 2, 3))").show
+---+---+
|pos|col|
+---+---+
|  0|  1|
|  1|  2|
|  2|  3|
+---+---+
```

Pass the Jenkins tests with newly added testcases.

Author: Dongjoon Hyun 

Closes #13971 from dongjoon-hyun/SPARK-16289.

(cherry picked from commit 46395db80e3304e3f3a1ebdc8aadb8f2819b48b4)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0497545
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0497545
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0497545

Branch: refs/heads/branch-2.0
Commit: a049754577aa78a5a26b38821233861a4dfd8e8a
Parents: 7ef1d1c
Author: Dongjoon Hyun 
Authored: Thu Jun 30 12:03:54 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jul 7 21:05:31 2016 -0700

--
 R/pkg/NAMESPACE |  1 +
 R/pkg/R/functions.R | 17 
 R/pkg/R/generics.R  |  4 +
 R/pkg/inst/tests/testthat/test_sparkSQL.R   |  2 +-
 python/pyspark/sql/functions.py | 21 +
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/generators.scala   | 66 +++---
 .../analysis/ExpressionTypeCheckingSuite.scala  |  2 +
 .../expressions/GeneratorExpressionSuite.scala  | 71 +++
 .../scala/org/apache/spark/sql/Column.scala |  1 +
 .../scala/org/apache/spark/sql/functions.scala  |  8 ++
 .../spark/sql/ColumnExpressionSuite.scala   | 60 -
 .../spark/sql/GeneratorFunctionSuite.scala  | 92 
 .../spark/sql/hive/HiveSessionCatalog.scala |  2 +-
 14 files changed, 276 insertions(+), 72 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a0497545/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 9fd2568..bc3aceb 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -235,6 +235,7 @@ exportMethods("%in%",
   "over",
   "percent_rank",
   "pmod",
+  "posexplode",
   "quarter",
   "rand",
   "randn",

http://git-wip-us.apache.org/repos/asf/spark/blob/a0497545/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 09e5afa..52d46f9 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2934,3 +2934,20 @@ setMethod("sort_array",
 jc <- callJStatic("org.apache.spark.sql.functions", "sort_array", 
x@jc, asc)
 column(jc)
   })
+
+#' posexplode
+#'
+#' Creates a new row for each element with position in the given array or map 
column.
+#'
+#' @rdname posexplode
+#' @name posexplode
+#' @family collection_funcs
+#' @export
+#' @examples \dontrun{posexplode(df$c)}
+#' @note posexplode since 2.1.0
+setMethod("posexplode",
+  signature(x = "Column"),
+  function(x) {
+jc <- callJStatic("org.apache.spark.sql.functions", "posexplode", 
x@jc)
+column(jc)
+  })

http://git-wip-us.apache.org/repos/asf/spark/blob/a0497545/R/pkg/R/generics.R
--
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index b0f25de..e4ec508 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1054,6 +1054,10 @@ setGeneric("percent_rank", function(x) { 
standardGeneric("percent_rank") })
 #' @export
 setGeneric("pmod", function(y, x) { standardGeneric("pmod") })
 
+#' @rdname posexplode
+#' @export
+setGeneric("posexplode", function(x) { standardGeneric("posexplode") })
+
 #' @rdname quarter
 #' @export
 setGeneric("quarter", function(x) { standardGeneric("quarter") })

http://git-wip-us.apache.org/repos/asf/spark/blob/a0497545/R/pkg/inst/tests/testthat/test_sparkSQL.R

spark git commit: [SPARK-16278][SPARK-16279][SQL] Implement map_keys/map_values SQL functions

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 88603bd4f -> 7ef1d1c61


[SPARK-16278][SPARK-16279][SQL] Implement map_keys/map_values SQL functions

This PR adds `map_keys` and `map_values` SQL functions in order to remove Hive 
fallback.

Pass the Jenkins tests including new testcases.

Author: Dongjoon Hyun 

Closes #13967 from dongjoon-hyun/SPARK-16278.

(cherry picked from commit 54b27c1797fcd32b3f3e9d44e1a149ae396a61e6)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ef1d1c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ef1d1c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ef1d1c6

Branch: refs/heads/branch-2.0
Commit: 7ef1d1c618100313dbbdb6f615d9f87ff67e895d
Parents: 88603bd
Author: Dongjoon Hyun 
Authored: Sun Jul 3 16:59:40 2016 +0800
Committer: Reynold Xin 
Committed: Thu Jul 7 21:02:50 2016 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  2 +
 .../expressions/collectionOperations.scala  | 48 
 .../expressions/CollectionFunctionsSuite.scala  | 13 ++
 .../spark/sql/DataFrameFunctionsSuite.scala | 16 +++
 .../spark/sql/hive/HiveSessionCatalog.scala |  1 -
 5 files changed, 79 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7ef1d1c6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 95be0d6..27c3a09 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -170,6 +170,8 @@ object FunctionRegistry {
 expression[IsNotNull]("isnotnull"),
 expression[Least]("least"),
 expression[CreateMap]("map"),
+expression[MapKeys]("map_keys"),
+expression[MapValues]("map_values"),
 expression[CreateNamedStruct]("named_struct"),
 expression[NaNvl]("nanvl"),
 expression[NullIf]("nullif"),

http://git-wip-us.apache.org/repos/asf/spark/blob/7ef1d1c6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index c71cb73..2e8ea11 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -44,6 +44,54 @@ case class Size(child: Expression) extends UnaryExpression 
with ExpectsInputType
 }
 
 /**
+ * Returns an unordered array containing the keys of the map.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(map) - Returns an unordered array containing the keys of the 
map.",
+  extended = " > SELECT _FUNC_(map(1, 'a', 2, 'b'));\n [1,2]")
+case class MapKeys(child: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType)
+
+  override def dataType: DataType = 
ArrayType(child.dataType.asInstanceOf[MapType].keyType)
+
+  override def nullSafeEval(map: Any): Any = {
+map.asInstanceOf[MapData].keyArray()
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).keyArray();")
+  }
+
+  override def prettyName: String = "map_keys"
+}
+
+/**
+ * Returns an unordered array containing the values of the map.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(map) - Returns an unordered array containing the values of 
the map.",
+  extended = " > SELECT _FUNC_(map(1, 'a', 2, 'b'));\n [\"a\",\"b\"]")
+case class MapValues(child: Expression)
+  extends UnaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType)
+
+  override def dataType: DataType = 
ArrayType(child.dataType.asInstanceOf[MapType].valueType)
+
+  override def nullSafeEval(map: Any): Any = {
+map.asInstanceOf[MapData].valueArray()
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).valueArray();")
+  }
+
+  override def prettyName: String = "map_values"
+}
+
+/**
  * Sorts the input array in 

spark git commit: [SPARK-16276][SQL] Implement elt SQL function

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 73c764a04 -> 88603bd4f


[SPARK-16276][SQL] Implement elt SQL function

This patch implements the elt function, as it is implemented in Hive.

Added expression unit test in StringExpressionsSuite and end-to-end test in 
StringFunctionsSuite.

Author: petermaxlee 

Closes #13966 from petermaxlee/SPARK-16276.

(cherry picked from commit 85f2303ecadd9bf6d9694a2743dda075654c5ccf)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88603bd4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88603bd4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88603bd4

Branch: refs/heads/branch-2.0
Commit: 88603bd4f9a665ad02df40ed8a0dd78b65c9d152
Parents: 73c764a
Author: petermaxlee 
Authored: Fri Jul 1 07:57:48 2016 +0800
Committer: Reynold Xin 
Committed: Thu Jul 7 21:00:53 2016 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../expressions/ExpectsInputTypes.scala |  3 +-
 .../expressions/stringExpressions.scala | 41 
 .../expressions/StringExpressionsSuite.scala| 23 +++
 .../apache/spark/sql/StringFunctionsSuite.scala | 14 +++
 .../spark/sql/hive/HiveSessionCatalog.scala |  2 +-
 6 files changed, 82 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88603bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 0bde48c..95be0d6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -265,6 +265,7 @@ object FunctionRegistry {
 expression[Concat]("concat"),
 expression[ConcatWs]("concat_ws"),
 expression[Decode]("decode"),
+expression[Elt]("elt"),
 expression[Encode]("encode"),
 expression[FindInSet]("find_in_set"),
 expression[FormatNumber]("format_number"),

http://git-wip-us.apache.org/repos/asf/spark/blob/88603bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
index c15a2df..98f25a9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
@@ -57,7 +57,8 @@ trait ExpectsInputTypes extends Expression {
 
 
 /**
- * A mixin for the analyzer to perform implicit type casting using 
[[ImplicitTypeCasts]].
+ * A mixin for the analyzer to perform implicit type casting using
+ * [[org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts]].
  */
 trait ImplicitCastInputTypes extends ExpectsInputTypes {
   // No other methods

http://git-wip-us.apache.org/repos/asf/spark/blob/88603bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 44ff7fd..b0df957 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -21,6 +21,7 @@ import java.text.{DecimalFormat, DecimalFormatSymbols}
 import java.util.{HashMap, Locale, Map => JMap}
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.types._
@@ -162,6 +163,46 @@ case class ConcatWs(children: Seq[Expression])
   }
 }
 
+@ExpressionDescription(
+  usage = "_FUNC_(n, str1, str2, ...) - returns the n-th string, e.g. returns 
str2 when n is 2",
+  extended = "> SELECT _FUNC_(1, 'scala', 'java') FROM src LIMIT 1;\n" + 
"'scala'")
+case class Elt(children: 

spark git commit: [SPARK-16425][R] `describe()` should not fail with non-numeric columns

2016-07-07 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5828da41c -> 73c764a04


[SPARK-16425][R] `describe()` should not fail with non-numeric columns

## What changes were proposed in this pull request?

This PR prevents ERRORs when `summary(df)` is called for `SparkDataFrame` with 
not-numeric columns. This failure happens only in `SparkR`.

**Before**
```r
> df <- createDataFrame(faithful)
> df <- withColumn(df, "boolean", df$waiting==79)
> summary(df)
16/07/07 14:15:16 ERROR RBackendHandler: describe on 34 failed
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: cannot resolve 'avg(`boolean`)' due 
to data type mismatch: function average requires numeric types, not BooleanType;
```

**After**
```r
> df <- createDataFrame(faithful)
> df <- withColumn(df, "boolean", df$waiting==79)
> summary(df)
SparkDataFrame[summary:string, eruptions:string, waiting:string]
```

## How was this patch tested?

Pass the Jenkins with a updated testcase.

Author: Dongjoon Hyun 

Closes #14096 from dongjoon-hyun/SPARK-16425.

(cherry picked from commit 6aa7d09f4e126f42e41085dec169c813379ed354)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73c764a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73c764a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73c764a0

Branch: refs/heads/branch-2.0
Commit: 73c764a047f795c85909c7a7ea4324f286d2aafa
Parents: 5828da4
Author: Dongjoon Hyun 
Authored: Thu Jul 7 17:47:29 2016 -0700
Committer: Shivaram Venkataraman 
Committed: Thu Jul 7 17:47:38 2016 -0700

--
 R/pkg/R/DataFrame.R   | 3 +--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 ++--
 2 files changed, 7 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/73c764a0/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 17474d4..ec09aab 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2617,8 +2617,7 @@ setMethod("describe",
 setMethod("describe",
   signature(x = "SparkDataFrame"),
   function(x) {
-colList <- as.list(c(columns(x)))
-sdf <- callJMethod(x@sdf, "describe", colList)
+sdf <- callJMethod(x@sdf, "describe", list())
 dataFrame(sdf)
   })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/73c764a0/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 003fcce..755aded 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1816,13 +1816,17 @@ test_that("describe() and summarize() on a DataFrame", {
   expect_equal(collect(stats)[2, "age"], "24.5")
   expect_equal(collect(stats)[3, "age"], "7.7781745930520225")
   stats <- describe(df)
-  expect_equal(collect(stats)[4, "name"], "Andy")
+  expect_equal(collect(stats)[4, "name"], NULL)
   expect_equal(collect(stats)[5, "age"], "30")
 
   stats2 <- summary(df)
-  expect_equal(collect(stats2)[4, "name"], "Andy")
+  expect_equal(collect(stats2)[4, "name"], NULL)
   expect_equal(collect(stats2)[5, "age"], "30")
 
+  # SPARK-16425: SparkR summary() fails on column of type logical
+  df <- withColumn(df, "boolean", df$age == 30)
+  summary(df)
+
   # Test base::summary is working
   expect_equal(length(summary(attenu, digits = 4)), 35)
 })


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16425][R] `describe()` should not fail with non-numeric columns

2016-07-07 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master f4767bcc7 -> 6aa7d09f4


[SPARK-16425][R] `describe()` should not fail with non-numeric columns

## What changes were proposed in this pull request?

This PR prevents ERRORs when `summary(df)` is called for `SparkDataFrame` with 
not-numeric columns. This failure happens only in `SparkR`.

**Before**
```r
> df <- createDataFrame(faithful)
> df <- withColumn(df, "boolean", df$waiting==79)
> summary(df)
16/07/07 14:15:16 ERROR RBackendHandler: describe on 34 failed
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: cannot resolve 'avg(`boolean`)' due 
to data type mismatch: function average requires numeric types, not BooleanType;
```

**After**
```r
> df <- createDataFrame(faithful)
> df <- withColumn(df, "boolean", df$waiting==79)
> summary(df)
SparkDataFrame[summary:string, eruptions:string, waiting:string]
```

## How was this patch tested?

Pass the Jenkins with a updated testcase.

Author: Dongjoon Hyun 

Closes #14096 from dongjoon-hyun/SPARK-16425.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6aa7d09f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6aa7d09f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6aa7d09f

Branch: refs/heads/master
Commit: 6aa7d09f4e126f42e41085dec169c813379ed354
Parents: f4767bc
Author: Dongjoon Hyun 
Authored: Thu Jul 7 17:47:29 2016 -0700
Committer: Shivaram Venkataraman 
Committed: Thu Jul 7 17:47:29 2016 -0700

--
 R/pkg/R/DataFrame.R   | 3 +--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 ++--
 2 files changed, 7 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6aa7d09f/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 5944bbc..a18eee3 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2622,8 +2622,7 @@ setMethod("describe",
 setMethod("describe",
   signature(x = "SparkDataFrame"),
   function(x) {
-colList <- as.list(c(columns(x)))
-sdf <- callJMethod(x@sdf, "describe", colList)
+sdf <- callJMethod(x@sdf, "describe", list())
 dataFrame(sdf)
   })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6aa7d09f/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index a0ab719..e2a1da0 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1824,13 +1824,17 @@ test_that("describe() and summarize() on a DataFrame", {
   expect_equal(collect(stats)[2, "age"], "24.5")
   expect_equal(collect(stats)[3, "age"], "7.7781745930520225")
   stats <- describe(df)
-  expect_equal(collect(stats)[4, "name"], "Andy")
+  expect_equal(collect(stats)[4, "name"], NULL)
   expect_equal(collect(stats)[5, "age"], "30")
 
   stats2 <- summary(df)
-  expect_equal(collect(stats2)[4, "name"], "Andy")
+  expect_equal(collect(stats2)[4, "name"], NULL)
   expect_equal(collect(stats2)[5, "age"], "30")
 
+  # SPARK-16425: SparkR summary() fails on column of type logical
+  df <- withColumn(df, "boolean", df$age == 30)
+  summary(df)
+
   # Test base::summary is working
   expect_equal(length(summary(attenu, digits = 4)), 35)
 })


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16310][SPARKR] R na.string-like default for csv source

2016-07-07 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 28710b42b -> f4767bcc7


[SPARK-16310][SPARKR] R na.string-like default for csv source

## What changes were proposed in this pull request?

Apply default "NA" as null string for R, like R read.csv na.string parameter.

https://stat.ethz.ch/R-manual/R-devel/library/utils/html/read.table.html
na.strings = "NA"

An user passing a csv file with NA value should get the same behavior with 
SparkR read.df(... source = "csv")

(couldn't open JIRA, will do that later)

## How was this patch tested?

unit tests

shivaram

Author: Felix Cheung 

Closes #13984 from felixcheung/rcsvnastring.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4767bcc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4767bcc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4767bcc

Branch: refs/heads/master
Commit: f4767bcc7a9d1bdd301f054776aa45e7c9f344a7
Parents: 28710b4
Author: Felix Cheung 
Authored: Thu Jul 7 15:21:57 2016 -0700
Committer: Shivaram Venkataraman 
Committed: Thu Jul 7 15:21:57 2016 -0700

--
 R/pkg/R/SQLContext.R  | 10 ++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 32 +-
 2 files changed, 34 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f4767bcc/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 8df73db..bc0daa2 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -714,11 +714,14 @@ dropTempView <- function(viewName) {
 #'
 #' The data source is specified by the `source` and a set of options(...).
 #' If `source` is not specified, the default data source configured by
-#' "spark.sql.sources.default" will be used.
+#' "spark.sql.sources.default" will be used. \cr
+#' Similar to R read.csv, when `source` is "csv", by default, a value of "NA" 
will be interpreted
+#' as NA.
 #'
 #' @param path The path of files to load
 #' @param source The name of external data source
 #' @param schema The data schema defined in structType
+#' @param na.strings Default string value for NA when source is "csv"
 #' @return SparkDataFrame
 #' @rdname read.df
 #' @name read.df
@@ -735,7 +738,7 @@ dropTempView <- function(viewName) {
 #' @name read.df
 #' @method read.df default
 #' @note read.df since 1.4.0
-read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
+read.df.default <- function(path = NULL, source = NULL, schema = NULL, 
na.strings = "NA", ...) {
   sparkSession <- getSparkSession()
   options <- varargsToEnv(...)
   if (!is.null(path)) {
@@ -744,6 +747,9 @@ read.df.default <- function(path = NULL, source = NULL, 
schema = NULL, ...) {
   if (is.null(source)) {
 source <- getDefaultSqlSource()
   }
+  if (source == "csv" && is.null(options[["nullValue"]])) {
+options[["nullValue"]] <- na.strings
+  }
   if (!is.null(schema)) {
 stopifnot(class(schema) == "structType")
 sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", 
sparkSession, source,

http://git-wip-us.apache.org/repos/asf/spark/blob/f4767bcc/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index a3aa26d..a0ab719 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -213,15 +213,35 @@ test_that("read csv as DataFrame", {
   mockLinesCsv <- c("year,make,model,comment,blank",
"\"2012\",\"Tesla\",\"S\",\"No comment\",",
"1997,Ford,E350,\"Go get one now they are going fast\",",
-   "2015,Chevy,Volt")
+   "2015,Chevy,Volt",
+   "NA,Dummy,Placeholder")
   writeLines(mockLinesCsv, csvPath)
 
-  # default "header" is false
-  df <- read.df(csvPath, "csv", header = "true")
-  expect_equal(count(df), 3)
+  # default "header" is false, inferSchema to handle "year" as "int"
+  df <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
+  expect_equal(count(df), 4)
   expect_equal(columns(df), c("year", "make", "model", "comment", "blank"))
-  expect_equal(sort(unlist(collect(where(df, df$year == "2015",
-   sort(unlist(list(year = "2015", make = "Chevy", model = 
"Volt"
+  expect_equal(sort(unlist(collect(where(df, df$year == 2015,
+   sort(unlist(list(year = 2015, make = "Chevy", model = "Volt"
+
+  # since "year" is "int", let's skip the NA values
+  withoutna <- na.omit(df, how = "any", cols = "year")
+  

spark git commit: [SPARK-16310][SPARKR] R na.string-like default for csv source

2016-07-07 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 30cb3f1d3 -> 5828da41c


[SPARK-16310][SPARKR] R na.string-like default for csv source

## What changes were proposed in this pull request?

Apply default "NA" as null string for R, like R read.csv na.string parameter.

https://stat.ethz.ch/R-manual/R-devel/library/utils/html/read.table.html
na.strings = "NA"

An user passing a csv file with NA value should get the same behavior with 
SparkR read.df(... source = "csv")

(couldn't open JIRA, will do that later)

## How was this patch tested?

unit tests

shivaram

Author: Felix Cheung 

Closes #13984 from felixcheung/rcsvnastring.

(cherry picked from commit f4767bcc7a9d1bdd301f054776aa45e7c9f344a7)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5828da41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5828da41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5828da41

Branch: refs/heads/branch-2.0
Commit: 5828da41cb2d815708191bd9a5cf3bd82795aa41
Parents: 30cb3f1
Author: Felix Cheung 
Authored: Thu Jul 7 15:21:57 2016 -0700
Committer: Shivaram Venkataraman 
Committed: Thu Jul 7 15:22:06 2016 -0700

--
 R/pkg/R/SQLContext.R  | 10 ++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 32 +-
 2 files changed, 34 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5828da41/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 8df73db..bc0daa2 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -714,11 +714,14 @@ dropTempView <- function(viewName) {
 #'
 #' The data source is specified by the `source` and a set of options(...).
 #' If `source` is not specified, the default data source configured by
-#' "spark.sql.sources.default" will be used.
+#' "spark.sql.sources.default" will be used. \cr
+#' Similar to R read.csv, when `source` is "csv", by default, a value of "NA" 
will be interpreted
+#' as NA.
 #'
 #' @param path The path of files to load
 #' @param source The name of external data source
 #' @param schema The data schema defined in structType
+#' @param na.strings Default string value for NA when source is "csv"
 #' @return SparkDataFrame
 #' @rdname read.df
 #' @name read.df
@@ -735,7 +738,7 @@ dropTempView <- function(viewName) {
 #' @name read.df
 #' @method read.df default
 #' @note read.df since 1.4.0
-read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
+read.df.default <- function(path = NULL, source = NULL, schema = NULL, 
na.strings = "NA", ...) {
   sparkSession <- getSparkSession()
   options <- varargsToEnv(...)
   if (!is.null(path)) {
@@ -744,6 +747,9 @@ read.df.default <- function(path = NULL, source = NULL, 
schema = NULL, ...) {
   if (is.null(source)) {
 source <- getDefaultSqlSource()
   }
+  if (source == "csv" && is.null(options[["nullValue"]])) {
+options[["nullValue"]] <- na.strings
+  }
   if (!is.null(schema)) {
 stopifnot(class(schema) == "structType")
 sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", 
sparkSession, source,

http://git-wip-us.apache.org/repos/asf/spark/blob/5828da41/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index d22baf6..003fcce 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -213,15 +213,35 @@ test_that("read csv as DataFrame", {
   mockLinesCsv <- c("year,make,model,comment,blank",
"\"2012\",\"Tesla\",\"S\",\"No comment\",",
"1997,Ford,E350,\"Go get one now they are going fast\",",
-   "2015,Chevy,Volt")
+   "2015,Chevy,Volt",
+   "NA,Dummy,Placeholder")
   writeLines(mockLinesCsv, csvPath)
 
-  # default "header" is false
-  df <- read.df(csvPath, "csv", header = "true")
-  expect_equal(count(df), 3)
+  # default "header" is false, inferSchema to handle "year" as "int"
+  df <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
+  expect_equal(count(df), 4)
   expect_equal(columns(df), c("year", "make", "model", "comment", "blank"))
-  expect_equal(sort(unlist(collect(where(df, df$year == "2015",
-   sort(unlist(list(year = "2015", make = "Chevy", model = 
"Volt"
+  expect_equal(sort(unlist(collect(where(df, df$year == 2015,
+   sort(unlist(list(year = 2015, make = "Chevy", model = 

spark git commit: [SPARK-16415][SQL] fix catalog string error

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 cbfd94eac -> 30cb3f1d3


[SPARK-16415][SQL] fix catalog string error

## What changes were proposed in this pull request?

In #13537 we truncate `simpleString` if it is a long `StructType`. But 
sometimes we need `catalogString` to reconstruct `TypeInfo`, for example in 
description of [SPARK-16415 
](https://issues.apache.org/jira/browse/SPARK-16415). So we need to keep the 
implementation of `catalogString` not affected by our truncate.

## How was this patch tested?

added a test case.

Author: Daoyuan Wang 

Closes #14089 from adrian-wang/catalogstring.

(cherry picked from commit 28710b42b0d18a55bd64d597558649537259b127)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30cb3f1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30cb3f1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30cb3f1d

Branch: refs/heads/branch-2.0
Commit: 30cb3f1d3a1d413568d586e6b8df56f74f05d80e
Parents: cbfd94e
Author: Daoyuan Wang 
Authored: Thu Jul 7 11:08:06 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jul 7 11:08:12 2016 -0700

--
 .../scala/org/apache/spark/sql/types/StructType.scala |  6 ++
 .../spark/sql/hive/HiveMetastoreCatalogSuite.scala| 14 +++---
 2 files changed, 17 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/30cb3f1d/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index effef54..55fdfbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -298,6 +298,12 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
 Utils.truncatedString(fieldTypes, "struct<", ",", ">")
   }
 
+  override def catalogString: String = {
+// in catalogString, we should not truncate
+val fieldTypes = fields.map(field => 
s"${field.name}:${field.dataType.catalogString}")
+s"struct<${fieldTypes.mkString(",")}>"
+  }
+
   override def sql: String = {
 val fieldTypes = fields.map(f => s"${quoteIdentifier(f.name)}: 
${f.dataType.sql}")
 s"STRUCT<${fieldTypes.mkString(", ")}>"

http://git-wip-us.apache.org/repos/asf/spark/blob/30cb3f1d/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index b420781..754aabb 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -26,15 +26,15 @@ import 
org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
-import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
+import org.apache.spark.sql.types.{DecimalType, StringType, StructField, 
StructType}
 
 class HiveMetastoreCatalogSuite extends TestHiveSingleton {
   import spark.implicits._
 
   test("struct field should accept underscore in sub-column name") {
 val hiveTypeStr = "struct"
-val dateType = CatalystSqlParser.parseDataType(hiveTypeStr)
-assert(dateType.isInstanceOf[StructType])
+val dataType = CatalystSqlParser.parseDataType(hiveTypeStr)
+assert(dataType.isInstanceOf[StructType])
   }
 
   test("udt to metastore type conversion") {
@@ -49,6 +49,14 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton {
 logInfo(df.queryExecution.toString)
 df.as('a).join(df.as('b), $"a.key" === $"b.key")
   }
+
+  test("should not truncate struct type catalog string") {
+def field(n: Int): StructField = {
+  StructField("col" + n, StringType)
+}
+val dataType = StructType((1 to 100).map(field))
+assert(CatalystSqlParser.parseDataType(dataType.catalogString) == dataType)
+  }
 }
 
 class DataSourceWithHiveMetastoreCatalogSuite


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16415][SQL] fix catalog string error

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 0f7175def -> 28710b42b


[SPARK-16415][SQL] fix catalog string error

## What changes were proposed in this pull request?

In #13537 we truncate `simpleString` if it is a long `StructType`. But 
sometimes we need `catalogString` to reconstruct `TypeInfo`, for example in 
description of [SPARK-16415 
](https://issues.apache.org/jira/browse/SPARK-16415). So we need to keep the 
implementation of `catalogString` not affected by our truncate.

## How was this patch tested?

added a test case.

Author: Daoyuan Wang 

Closes #14089 from adrian-wang/catalogstring.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28710b42
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28710b42
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28710b42

Branch: refs/heads/master
Commit: 28710b42b0d18a55bd64d597558649537259b127
Parents: 0f7175d
Author: Daoyuan Wang 
Authored: Thu Jul 7 11:08:06 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jul 7 11:08:06 2016 -0700

--
 .../scala/org/apache/spark/sql/types/StructType.scala |  6 ++
 .../spark/sql/hive/HiveMetastoreCatalogSuite.scala| 14 +++---
 2 files changed, 17 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/28710b42/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 0c2ebb0..dd4c88c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -333,6 +333,12 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
 Utils.truncatedString(fieldTypes, "struct<", ",", ">")
   }
 
+  override def catalogString: String = {
+// in catalogString, we should not truncate
+val fieldTypes = fields.map(field => 
s"${field.name}:${field.dataType.catalogString}")
+s"struct<${fieldTypes.mkString(",")}>"
+  }
+
   override def sql: String = {
 val fieldTypes = fields.map(f => s"${quoteIdentifier(f.name)}: 
${f.dataType.sql}")
 s"STRUCT<${fieldTypes.mkString(", ")}>"

http://git-wip-us.apache.org/repos/asf/spark/blob/28710b42/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index b420781..754aabb 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -26,15 +26,15 @@ import 
org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
-import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
+import org.apache.spark.sql.types.{DecimalType, StringType, StructField, 
StructType}
 
 class HiveMetastoreCatalogSuite extends TestHiveSingleton {
   import spark.implicits._
 
   test("struct field should accept underscore in sub-column name") {
 val hiveTypeStr = "struct"
-val dateType = CatalystSqlParser.parseDataType(hiveTypeStr)
-assert(dateType.isInstanceOf[StructType])
+val dataType = CatalystSqlParser.parseDataType(hiveTypeStr)
+assert(dataType.isInstanceOf[StructType])
   }
 
   test("udt to metastore type conversion") {
@@ -49,6 +49,14 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton {
 logInfo(df.queryExecution.toString)
 df.as('a).join(df.as('b), $"a.key" === $"b.key")
   }
+
+  test("should not truncate struct type catalog string") {
+def field(n: Int): StructField = {
+  StructField("col" + n, StringType)
+}
+val dataType = StructType((1 to 100).map(field))
+assert(CatalystSqlParser.parseDataType(dataType.catalogString) == dataType)
+  }
 }
 
 class DataSourceWithHiveMetastoreCatalogSuite


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()

2016-07-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 24933355c -> cbfd94eac


[SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()

## What changes were proposed in this pull request?

There are cases where `complete` output mode does not output updated aggregated 
value; for details please refer to 
[SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350).

The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in 
`ForeachSink.addBatch()`, `foreachPartition()` does not support incremental 
planning for now.

This patches makes `foreachPartition()` support incremental planning in 
`ForeachSink`, by making a special version of `Dataset` with its `rdd()` method 
supporting incremental planning.

## How was this patch tested?

Added a unit test which failed before the change

Author: Liwei Lin 

Closes #14030 from lw-lin/fix-foreach-complete.

(cherry picked from commit 0f7175def985a7f1e37198680f893e749612ab76)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbfd94ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbfd94ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbfd94ea

Branch: refs/heads/branch-2.0
Commit: cbfd94eacf46b61011f1bd8d30f0c134cab37b09
Parents: 2493335
Author: Liwei Lin 
Authored: Thu Jul 7 10:40:42 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Jul 7 10:40:52 2016 -0700

--
 .../sql/execution/streaming/ForeachSink.scala   | 40 -
 .../streaming/IncrementalExecution.scala|  4 +-
 .../execution/streaming/ForeachSinkSuite.scala  | 86 ++--
 3 files changed, 117 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cbfd94ea/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 14b9b1c..082664a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter}
+import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde
 
 /**
  * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the 
contract defined by
@@ -30,7 +32,41 @@ import org.apache.spark.sql.{DataFrame, Encoder, 
ForeachWriter}
 class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
-data.as[T].foreachPartition { iter =>
+// TODO: Refine this method when SPARK-16264 is resolved; see comments 
below.
+
+// This logic should've been as simple as:
+// ```
+//   data.as[T].foreachPartition { iter => ... }
+// ```
+//
+// Unfortunately, doing that would just break the incremental planing. The 
reason is,
+// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but 
`Dataset.rdd()` just
+// does not support `IncrementalExecution`.
+//
+// So as a provisional fix, below we've made a special version of 
`Dataset` with its `rdd()`
+// method supporting incremental planning. But in the long run, we should 
generally make newly
+// created Datasets use `IncrementalExecution` where necessary (which is 
SPARK-16264 tries to
+// resolve).
+
+val datasetWithIncrementalExecution =
+  new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) 
{
+override lazy val rdd: RDD[T] = {
+  val objectType = exprEnc.deserializer.dataType
+  val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+
+  // was originally: 
sparkSession.sessionState.executePlan(deserialized) ...
+  val incrementalExecution = new IncrementalExecution(
+this.sparkSession,
+deserialized,
+data.queryExecution.asInstanceOf[IncrementalExecution].outputMode,
+
data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation,
+
data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId)
+  incrementalExecution.toRdd.mapPartitions { rows =>
+rows.map(_.get(0, objectType))
+  

spark git commit: [SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()

2016-07-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master a04cab8f1 -> 0f7175def


[SPARK-16350][SQL] Fix support for incremental planning in wirteStream.foreach()

## What changes were proposed in this pull request?

There are cases where `complete` output mode does not output updated aggregated 
value; for details please refer to 
[SPARK-16350](https://issues.apache.org/jira/browse/SPARK-16350).

The cause is that, as we do `data.as[T].foreachPartition { iter => ... }` in 
`ForeachSink.addBatch()`, `foreachPartition()` does not support incremental 
planning for now.

This patches makes `foreachPartition()` support incremental planning in 
`ForeachSink`, by making a special version of `Dataset` with its `rdd()` method 
supporting incremental planning.

## How was this patch tested?

Added a unit test which failed before the change

Author: Liwei Lin 

Closes #14030 from lw-lin/fix-foreach-complete.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f7175de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f7175de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f7175de

Branch: refs/heads/master
Commit: 0f7175def985a7f1e37198680f893e749612ab76
Parents: a04cab8
Author: Liwei Lin 
Authored: Thu Jul 7 10:40:42 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Jul 7 10:40:42 2016 -0700

--
 .../sql/execution/streaming/ForeachSink.scala   | 40 -
 .../streaming/IncrementalExecution.scala|  4 +-
 .../execution/streaming/ForeachSinkSuite.scala  | 86 ++--
 3 files changed, 117 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0f7175de/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 14b9b1c..082664a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, ForeachWriter}
+import org.apache.spark.sql.catalyst.plans.logical.CatalystSerde
 
 /**
  * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the 
contract defined by
@@ -30,7 +32,41 @@ import org.apache.spark.sql.{DataFrame, Encoder, 
ForeachWriter}
 class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with 
Serializable {
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
-data.as[T].foreachPartition { iter =>
+// TODO: Refine this method when SPARK-16264 is resolved; see comments 
below.
+
+// This logic should've been as simple as:
+// ```
+//   data.as[T].foreachPartition { iter => ... }
+// ```
+//
+// Unfortunately, doing that would just break the incremental planing. The 
reason is,
+// `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but 
`Dataset.rdd()` just
+// does not support `IncrementalExecution`.
+//
+// So as a provisional fix, below we've made a special version of 
`Dataset` with its `rdd()`
+// method supporting incremental planning. But in the long run, we should 
generally make newly
+// created Datasets use `IncrementalExecution` where necessary (which is 
SPARK-16264 tries to
+// resolve).
+
+val datasetWithIncrementalExecution =
+  new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) 
{
+override lazy val rdd: RDD[T] = {
+  val objectType = exprEnc.deserializer.dataType
+  val deserialized = CatalystSerde.deserialize[T](logicalPlan)
+
+  // was originally: 
sparkSession.sessionState.executePlan(deserialized) ...
+  val incrementalExecution = new IncrementalExecution(
+this.sparkSession,
+deserialized,
+data.queryExecution.asInstanceOf[IncrementalExecution].outputMode,
+
data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation,
+
data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId)
+  incrementalExecution.toRdd.mapPartitions { rows =>
+rows.map(_.get(0, objectType))
+  }.asInstanceOf[RDD[T]]
+}
+  }
+datasetWithIncrementalExecution.foreachPartition { iter =>
   if 

spark git commit: Revert "[SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix"

2016-07-07 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 45dda9221 -> bb92788f9


Revert "[SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix"

This reverts commit 45dda92214191310a56333a2085e2343eba170cd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb92788f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb92788f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb92788f

Branch: refs/heads/branch-1.6
Commit: bb92788f96426e57555ba5771e256c6425e0e75e
Parents: 45dda92
Author: Shixiong Zhu 
Authored: Thu Jul 7 10:34:50 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Jul 7 10:34:50 2016 -0700

--
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  2 +-
 .../mllib/linalg/distributed/RowMatrix.scala|  2 +-
 .../linalg/distributed/JavaRowMatrixSuite.java  | 44 
 3 files changed, 2 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bb92788f/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index a059e38..1714983 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1110,7 +1110,7 @@ private[python] class PythonMLLibAPI extends Serializable 
{
* Wrapper around RowMatrix constructor.
*/
   def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): 
RowMatrix = {
-new RowMatrix(rows.rdd, numRows, numCols)
+new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bb92788f/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index b941d1f..52c0f19 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -526,7 +526,7 @@ class RowMatrix @Since("1.0.0") (
   def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, 
Matrix] = {
 val col = numCols().toInt
 // split rows horizontally into smaller matrices, and compute QR for each 
of them
-val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows =>
+val blockQRs = rows.glom().map { partRows =>
   val bdm = BDM.zeros[Double](partRows.length, col)
   var i = 0
   partRows.foreach { row =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bb92788f/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
--
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
 
b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
deleted file mode 100644
index c01af40..000
--- 
a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.linalg.distributed;
-
-import java.util.Arrays;
-
-import org.junit.Test;
-
-import org.apache.spark.SharedSparkSession;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.mllib.linalg.Matrix;
-import org.apache.spark.mllib.linalg.QRDecomposition;
-import org.apache.spark.mllib.linalg.Vector;
-import org.apache.spark.mllib.linalg.Vectors;
-
-public class JavaRowMatrixSuite extends SharedSparkSession {
-
- 

spark git commit: [SPARK-16174][SQL] Improve `OptimizeIn` optimizer to remove literal repetitions

2016-07-07 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 6343f6655 -> a04cab8f1


[SPARK-16174][SQL] Improve `OptimizeIn` optimizer to remove literal repetitions

## What changes were proposed in this pull request?

This PR improves `OptimizeIn` optimizer to remove the literal repetitions from 
SQL `IN` predicates. This optimizer prevents user mistakes and also can 
optimize some queries like 
[TPCDS-36](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q36.sql#L19).

**Before**
```scala
scala> sql("select state from (select explode(array('CA','TN')) state) where 
state in ('TN','TN','TN','TN','TN','TN','TN')").explain
== Physical Plan ==
*Filter state#6 IN (TN,TN,TN,TN,TN,TN,TN)
+- Generate explode([CA,TN]), false, false, [state#6]
   +- Scan OneRowRelation[]
```

**After**
```scala
scala> sql("select state from (select explode(array('CA','TN')) state) where 
state in ('TN','TN','TN','TN','TN','TN','TN')").explain
== Physical Plan ==
*Filter state#6 IN (TN)
+- Generate explode([CA,TN]), false, false, [state#6]
   +- Scan OneRowRelation[]
```

## How was this patch tested?

Pass the Jenkins tests (including a new testcase).

Author: Dongjoon Hyun 

Closes #13876 from dongjoon-hyun/SPARK-16174.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a04cab8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a04cab8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a04cab8f

Branch: refs/heads/master
Commit: a04cab8f17fcac05f86d2c472558ab98923f91e3
Parents: 6343f66
Author: Dongjoon Hyun 
Authored: Thu Jul 7 19:45:43 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 7 19:45:43 2016 +0800

--
 .../sql/catalyst/expressions/predicates.scala   |  1 +
 .../sql/catalyst/optimizer/Optimizer.scala  | 20 +++-
 .../catalyst/optimizer/OptimizeInSuite.scala| 24 
 3 files changed, 39 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a04cab8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index a3b098a..734bacf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -132,6 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate
   }
 
   override def children: Seq[Expression] = value +: list
+  lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal])
 
   override def nullable: Boolean = children.exists(_.nullable)
   override def foldable: Boolean = children.forall(_.foldable)

http://git-wip-us.apache.org/repos/asf/spark/blob/a04cab8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9ee1735..03d15ea 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -820,16 +820,24 @@ object ConstantFolding extends Rule[LogicalPlan] {
 }
 
 /**
- * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, 
HashSet[Literal])]]
- * which is much faster
+ * Optimize IN predicates:
+ * 1. Removes literal repetitions.
+ * 2. Replaces [[In (value, seq[Literal])]] with optimized version
+ *[[InSet (value, HashSet[Literal])]] which is much faster.
  */
 case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
-  case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
-  list.size > conf.optimizerInSetConversionThreshold =>
-val hSet = list.map(e => e.eval(EmptyRow))
-InSet(v, HashSet() ++ hSet)
+  case expr @ In(v, list) if expr.inSetConvertible =>
+val newList = ExpressionSet(list).toSeq
+if (newList.size > conf.optimizerInSetConversionThreshold) {
+  val hSet = newList.map(e => e.eval(EmptyRow))
+  InSet(v, HashSet() ++ hSet)
+} else if (newList.size < list.size) {
+  expr.copy(list = newList)
+} else { // 

spark git commit: [SPARK-16399][PYSPARK] Force PYSPARK_PYTHON to python

2016-07-07 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 4c6f00d09 -> 6343f6655


[SPARK-16399][PYSPARK] Force PYSPARK_PYTHON to python

## What changes were proposed in this pull request?

I would like to change

```bash
if hash python2.7 2>/dev/null; then
  # Attempt to use Python 2.7, if installed:
  DEFAULT_PYTHON="python2.7"
else
  DEFAULT_PYTHON="python"
fi
```

to just ```DEFAULT_PYTHON="python"```

I'm not sure if it is a great assumption that python2.7 is used by default, 
when python points to something else.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: MechCoder 

Closes #14016 from MechCoder/followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6343f665
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6343f665
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6343f665

Branch: refs/heads/master
Commit: 6343f66557434ce889a25a7889d76d0d24188ced
Parents: 4c6f00d
Author: MechCoder 
Authored: Thu Jul 7 11:31:10 2016 +0100
Committer: Sean Owen 
Committed: Thu Jul 7 11:31:10 2016 +0100

--
 bin/pyspark | 14 +++---
 1 file changed, 3 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6343f665/bin/pyspark
--
diff --git a/bin/pyspark b/bin/pyspark
index ac8aa04..a0d7e22 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -30,14 +30,6 @@ export _SPARK_CMD_USAGE="Usage: ./bin/pyspark [options]"
 # (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook').  This supports full 
customization of the IPython
 # and executor Python executables.
 
-# Determine the Python executable to use if PYSPARK_PYTHON or 
PYSPARK_DRIVER_PYTHON isn't set:
-if hash python2.7 2>/dev/null; then
-  # Attempt to use Python 2.7, if installed:
-  DEFAULT_PYTHON="python2.7"
-else
-  DEFAULT_PYTHON="python"
-fi
-
 # Fail noisily if removed options are set
 if [[ -n "$IPYTHON" || -n "$IPYTHON_OPTS" ]]; then
   echo "Error in pyspark startup:" 
@@ -47,10 +39,10 @@ fi
 
 # Default to standard python interpreter unless told otherwise
 if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
-  PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
+  PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}"
 fi
 
-WORKS_WITH_IPYTHON=$($DEFAULT_PYTHON -c 'import sys; print(sys.version_info >= 
(2, 7, 0))')
+WORKS_WITH_IPYTHON=$(python -c 'import sys; print(sys.version_info >= (2, 7, 
0))')
 
 # Determine the Python executable to use for the executors:
 if [[ -z "$PYSPARK_PYTHON" ]]; then
@@ -58,7 +50,7 @@ if [[ -z "$PYSPARK_PYTHON" ]]; then
 echo "IPython requires Python 2.7+; please install python2.7 or set 
PYSPARK_PYTHON" 1>&2
 exit 1
   else
-PYSPARK_PYTHON="$DEFAULT_PYTHON"
+PYSPARK_PYTHON=python
   fi
 fi
 export PYSPARK_PYTHON


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix

2016-07-07 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 986b25140 -> 4c6f00d09


[SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix

## What changes were proposed in this pull request?

The following Java code because of type erasing:

```Java
JavaRDD rows = jsc.parallelize(...);
RowMatrix mat = new RowMatrix(rows.rdd());
QRDecomposition result = mat.tallSkinnyQR(true);
```

We should use retag to restore the type to prevent the following exception:

```Java
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Lorg.apache.spark.mllib.linalg.Vector;
```

## How was this patch tested?

Java unit test

Author: Xusen Yin 

Closes #14051 from yinxusen/SPARK-16372.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c6f00d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c6f00d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c6f00d0

Branch: refs/heads/master
Commit: 4c6f00d09c016dfc1d2de6e694dff219c9027fa0
Parents: 986b251
Author: Xusen Yin 
Authored: Thu Jul 7 11:28:04 2016 +0100
Committer: Sean Owen 
Committed: Thu Jul 7 11:28:04 2016 +0100

--
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  2 +-
 .../mllib/linalg/distributed/RowMatrix.scala|  2 +-
 .../linalg/distributed/JavaRowMatrixSuite.java  | 44 
 3 files changed, 46 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4c6f00d0/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f4819f7..a80cca7 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1127,7 +1127,7 @@ private[python] class PythonMLLibAPI extends Serializable 
{
* Wrapper around RowMatrix constructor.
*/
   def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): 
RowMatrix = {
-new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols)
+new RowMatrix(rows.rdd, numRows, numCols)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4c6f00d0/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index cd5209d..1c94479 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -537,7 +537,7 @@ class RowMatrix @Since("1.0.0") (
   def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, 
Matrix] = {
 val col = numCols().toInt
 // split rows horizontally into smaller matrices, and compute QR for each 
of them
-val blockQRs = rows.glom().map { partRows =>
+val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows =>
   val bdm = BDM.zeros[Double](partRows.length, col)
   var i = 0
   partRows.foreach { row =>

http://git-wip-us.apache.org/repos/asf/spark/blob/4c6f00d0/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
--
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
 
b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
new file mode 100644
index 000..c01af40
--- /dev/null
+++ 
b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * 

spark git commit: [SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix

2016-07-07 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 2588776ad -> 45dda9221


[SPARK-16372][MLLIB] Retag RDD to tallSkinnyQR of RowMatrix

## What changes were proposed in this pull request?

The following Java code because of type erasing:

```Java
JavaRDD rows = jsc.parallelize(...);
RowMatrix mat = new RowMatrix(rows.rdd());
QRDecomposition result = mat.tallSkinnyQR(true);
```

We should use retag to restore the type to prevent the following exception:

```Java
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Lorg.apache.spark.mllib.linalg.Vector;
```

## How was this patch tested?

Java unit test

Author: Xusen Yin 

Closes #14051 from yinxusen/SPARK-16372.

(cherry picked from commit 4c6f00d09c016dfc1d2de6e694dff219c9027fa0)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45dda922
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45dda922
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45dda922

Branch: refs/heads/branch-1.6
Commit: 45dda92214191310a56333a2085e2343eba170cd
Parents: 2588776
Author: Xusen Yin 
Authored: Thu Jul 7 11:28:04 2016 +0100
Committer: Sean Owen 
Committed: Thu Jul 7 11:28:29 2016 +0100

--
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  2 +-
 .../mllib/linalg/distributed/RowMatrix.scala|  2 +-
 .../linalg/distributed/JavaRowMatrixSuite.java  | 44 
 3 files changed, 46 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/45dda922/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 1714983..a059e38 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -1110,7 +1110,7 @@ private[python] class PythonMLLibAPI extends Serializable 
{
* Wrapper around RowMatrix constructor.
*/
   def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): 
RowMatrix = {
-new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols)
+new RowMatrix(rows.rdd, numRows, numCols)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/45dda922/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 52c0f19..b941d1f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -526,7 +526,7 @@ class RowMatrix @Since("1.0.0") (
   def tallSkinnyQR(computeQ: Boolean = false): QRDecomposition[RowMatrix, 
Matrix] = {
 val col = numCols().toInt
 // split rows horizontally into smaller matrices, and compute QR for each 
of them
-val blockQRs = rows.glom().map { partRows =>
+val blockQRs = rows.retag(classOf[Vector]).glom().map { partRows =>
   val bdm = BDM.zeros[Double](partRows.length, col)
   var i = 0
   partRows.foreach { row =>

http://git-wip-us.apache.org/repos/asf/spark/blob/45dda922/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
--
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
 
b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
new file mode 100644
index 000..c01af40
--- /dev/null
+++ 
b/mllib/src/test/java/org/apache/spark/mllib/linalg/distributed/JavaRowMatrixSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR 

411ED44345

2016-07-07 Thread commits




411ED44345.docm
Description: application/vnd.ms-word.document.macroenabled.12

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

spark git commit: [SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatched Column Num

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 920162a1e -> d63428af6


[SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatched Column 
Num

 What changes were proposed in this pull request?
When creating a view, a common user error is the number of columns produced by 
the `SELECT` clause does not match the number of column names specified by 
`CREATE VIEW`.

For example, given Table `t1` only has 3 columns
```SQL
create view v1(col2, col4, col3, col5) as select * from t1
```
Currently, Spark SQL reports the following error:
```
requirement failed
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:212)
at 
org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:90)
```

This error message is very confusing. This PR is to detect the error and issue 
a meaningful error message.

 How was this patch tested?
Added test cases

Author: gatorsmile 

Closes #14047 from gatorsmile/viewMismatchedColumns.

(cherry picked from commit ab05db0b48f395543cd7d91e2ad9dd760516868b)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d63428af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d63428af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d63428af

Branch: refs/heads/branch-2.0
Commit: d63428af6d1c7c0a0533567a0a7ccb5817a65de3
Parents: 920162a
Author: gatorsmile 
Authored: Thu Jul 7 00:07:25 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jul 7 00:07:31 2016 -0700

--
 .../spark/sql/execution/command/views.scala |  6 -
 .../spark/sql/execution/command/DDLSuite.scala  | 23 
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 23 
 3 files changed, 51 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d63428af/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 088f684..6533d79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -88,7 +88,11 @@ case class CreateViewCommand(
 qe.assertAnalyzed()
 val analyzedPlan = qe.analyzed
 
-require(tableDesc.schema == Nil || tableDesc.schema.length == 
analyzedPlan.output.length)
+if (tableDesc.schema != Nil && tableDesc.schema.length != 
analyzedPlan.output.length) {
+  throw new AnalysisException(s"The number of columns produced by the 
SELECT clause " +
+s"(num: `${analyzedPlan.output.length}`) does not match the number of 
column names " +
+s"specified by CREATE VIEW (num: `${tableDesc.schema.length}`).")
+}
 val sessionState = sparkSession.sessionState
 
 if (isTemporary) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d63428af/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0ee8d17..7d1f1d1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1314,6 +1314,29 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
 }
   }
 
+  test("create temporary view with mismatched schema") {
+withTable("tab1") {
+  spark.range(10).write.saveAsTable("tab1")
+  withView("view1") {
+val e = intercept[AnalysisException] {
+  sql("CREATE TEMPORARY VIEW view1 (col1, col3) AS SELECT * FROM tab1")
+}.getMessage
+assert(e.contains("the SELECT clause (num: `1`) does not match")
+  && e.contains("CREATE VIEW (num: `2`)"))
+  }
+}
+  }
+
+  test("create temporary view with specified schema") {
+withView("view1") {
+  sql("CREATE TEMPORARY VIEW view1 (col1, col2) AS SELECT 1, 2")
+  checkAnswer(
+sql("SELECT * FROM view1"),
+Row(1, 2) :: Nil
+  )
+}
+  }
+
   test("truncate table - external table, temporary table, view (not allowed)") 
{
 import testImplicits._
 val path = Utils.createTempDir().getAbsolutePath


spark git commit: [SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatched Column Num

2016-07-07 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master ce3ea9698 -> ab05db0b4


[SPARK-16368][SQL] Fix Strange Errors When Creating View With Unmatched Column 
Num

 What changes were proposed in this pull request?
When creating a view, a common user error is the number of columns produced by 
the `SELECT` clause does not match the number of column names specified by 
`CREATE VIEW`.

For example, given Table `t1` only has 3 columns
```SQL
create view v1(col2, col4, col3, col5) as select * from t1
```
Currently, Spark SQL reports the following error:
```
requirement failed
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:212)
at 
org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:90)
```

This error message is very confusing. This PR is to detect the error and issue 
a meaningful error message.

 How was this patch tested?
Added test cases

Author: gatorsmile 

Closes #14047 from gatorsmile/viewMismatchedColumns.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab05db0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab05db0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab05db0b

Branch: refs/heads/master
Commit: ab05db0b48f395543cd7d91e2ad9dd760516868b
Parents: ce3ea96
Author: gatorsmile 
Authored: Thu Jul 7 00:07:25 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jul 7 00:07:25 2016 -0700

--
 .../spark/sql/execution/command/views.scala |  6 -
 .../spark/sql/execution/command/DDLSuite.scala  | 23 
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 23 
 3 files changed, 51 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ab05db0b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 007fa46..16b333a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -88,7 +88,11 @@ case class CreateViewCommand(
 qe.assertAnalyzed()
 val analyzedPlan = qe.analyzed
 
-require(tableDesc.schema == Nil || tableDesc.schema.length == 
analyzedPlan.output.length)
+if (tableDesc.schema != Nil && tableDesc.schema.length != 
analyzedPlan.output.length) {
+  throw new AnalysisException(s"The number of columns produced by the 
SELECT clause " +
+s"(num: `${analyzedPlan.output.length}`) does not match the number of 
column names " +
+s"specified by CREATE VIEW (num: `${tableDesc.schema.length}`).")
+}
 val sessionState = sparkSession.sessionState
 
 if (isTemporary) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ab05db0b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 0ee8d17..7d1f1d1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1314,6 +1314,29 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
 }
   }
 
+  test("create temporary view with mismatched schema") {
+withTable("tab1") {
+  spark.range(10).write.saveAsTable("tab1")
+  withView("view1") {
+val e = intercept[AnalysisException] {
+  sql("CREATE TEMPORARY VIEW view1 (col1, col3) AS SELECT * FROM tab1")
+}.getMessage
+assert(e.contains("the SELECT clause (num: `1`) does not match")
+  && e.contains("CREATE VIEW (num: `2`)"))
+  }
+}
+  }
+
+  test("create temporary view with specified schema") {
+withView("view1") {
+  sql("CREATE TEMPORARY VIEW view1 (col1, col2) AS SELECT 1, 2")
+  checkAnswer(
+sql("SELECT * FROM view1"),
+Row(1, 2) :: Nil
+  )
+}
+  }
+
   test("truncate table - external table, temporary table, view (not allowed)") 
{
 import testImplicits._
 val path = Utils.createTempDir().getAbsolutePath

http://git-wip-us.apache.org/repos/asf/spark/blob/ab05db0b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
--
diff --git