spark git commit: [SPARK-16266][SQL][STREAING] Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming
Repository: spark Updated Branches: refs/heads/master 153c2f9ac -> f454a7f9f [SPARK-16266][SQL][STREAING] Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming ## What changes were proposed in this pull request? - Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming to make them consistent with scala packaging - Exposed the necessary classes in sql.streaming package so that they appear in the docs - Added pyspark.sql.streaming module to the docs ## How was this patch tested? - updated unit tests. - generated docs for testing visibility of pyspark.sql.streaming classes. Author: Tathagata DasCloses #13955 from tdas/SPARK-16266. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f454a7f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f454a7f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f454a7f9 Branch: refs/heads/master Commit: f454a7f9f03807dd768319798daa1351bbfc7288 Parents: 153c2f9 Author: Tathagata Das Authored: Tue Jun 28 22:07:11 2016 -0700 Committer: Shixiong Zhu Committed: Tue Jun 28 22:07:11 2016 -0700 -- python/docs/pyspark.sql.rst | 6 + python/pyspark/sql/context.py| 3 +- python/pyspark/sql/dataframe.py | 3 +- python/pyspark/sql/readwriter.py | 493 + python/pyspark/sql/session.py| 3 +- python/pyspark/sql/streaming.py | 502 +- 6 files changed, 511 insertions(+), 499 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/docs/pyspark.sql.rst -- diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index 6259379..3be9533 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -21,3 +21,9 @@ pyspark.sql.functions module .. automodule:: pyspark.sql.functions :members: :undoc-members: + +pyspark.sql.streaming module + +.. automodule:: pyspark.sql.streaming +:members: +:undoc-members: http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index b5dde13..3503fb9 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -26,7 +26,8 @@ from pyspark import since from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.session import _monkey_patch_RDD, SparkSession from pyspark.sql.dataframe import DataFrame -from pyspark.sql.readwriter import DataFrameReader, DataStreamReader +from pyspark.sql.readwriter import DataFrameReader +from pyspark.sql.streaming import DataStreamReader from pyspark.sql.types import Row, StringType from pyspark.sql.utils import install_exception_handler http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 4f13307..e44b01b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -33,7 +33,8 @@ from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync from pyspark.sql.types import _parse_datatype_json_string from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column -from pyspark.sql.readwriter import DataFrameWriter, DataStreamWriter +from pyspark.sql.readwriter import DataFrameWriter +from pyspark.sql.streaming import DataStreamWriter from pyspark.sql.types import * __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"] http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 3f28d7a..10f307b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq from pyspark.sql.types import * from pyspark.sql import utils -__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"] +__all__ = ["DataFrameReader", "DataFrameWriter"] def to_str(value): @@ -724,494 +724,6 @@ class DataFrameWriter(OptionUtils): self._jwrite.mode(mode).jdbc(url, table, jprop) -class DataStreamReader(OptionUtils): -""" -Interface used to load a streaming :class:`DataFrame` from external storage systems -(e.g. file systems, key-value stores,
spark git commit: [SPARK-16271][SQL] Implement Hive's UDFXPathUtil
Repository: spark Updated Branches: refs/heads/master 0df5ce1bc -> 153c2f9ac [SPARK-16271][SQL] Implement Hive's UDFXPathUtil ## What changes were proposed in this pull request? This patch ports Hive's UDFXPathUtil over to Spark, which can be used to implement xpath functionality in Spark in the near future. ## How was this patch tested? Added two new test suites UDFXPathUtilSuite and ReusableStringReaderSuite. They have been ported over from Hive (but rewritten in Scala in order to leverage ScalaTest). Author: petermaxleeCloses #13961 from petermaxlee/xpath. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/153c2f9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/153c2f9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/153c2f9a Branch: refs/heads/master Commit: 153c2f9ac12846367a09684fd875c496d350a603 Parents: 0df5ce1 Author: petermaxlee Authored: Tue Jun 28 21:07:52 2016 -0700 Committer: Reynold Xin Committed: Tue Jun 28 21:07:52 2016 -0700 -- .../catalyst/expressions/xml/UDFXPathUtil.java | 192 +++ .../xml/ReusableStringReaderSuite.scala | 103 ++ .../expressions/xml/UDFXPathUtilSuite.scala | 99 ++ 3 files changed, 394 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/153c2f9a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java new file mode 100644 index 000..01a11f9 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.xml; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; + +import javax.xml.namespace.QName; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; + +/** + * Utility class for all XPath UDFs. Each UDF instance should keep an instance of this class. + * + * This is based on Hive's UDFXPathUtil implementation. + */ +public class UDFXPathUtil { + private XPath xpath = XPathFactory.newInstance().newXPath(); + private ReusableStringReader reader = new ReusableStringReader(); + private InputSource inputSource = new InputSource(reader); + private XPathExpression expression = null; + private String oldPath = null; + + public Object eval(String xml, String path, QName qname) { +if (xml == null || path == null || qname == null) { + return null; +} + +if (xml.length() == 0 || path.length() == 0) { + return null; +} + +if (!path.equals(oldPath)) { + try { +expression = xpath.compile(path); + } catch (XPathExpressionException e) { +expression = null; + } + oldPath = path; +} + +if (expression == null) { + return null; +} + +reader.set(xml); + +try { + return expression.evaluate(inputSource, qname); +} catch (XPathExpressionException e) { + throw new RuntimeException ("Invalid expression '" + oldPath + "'", e); +} + } + + public Boolean evalBoolean(String xml, String path) { +return (Boolean) eval(xml, path, XPathConstants.BOOLEAN); + } + + public String evalString(String xml, String path) { +return (String) eval(xml, path, XPathConstants.STRING); + } + + public Double evalNumber(String xml, String path) { +return (Double) eval(xml, path, XPathConstants.NUMBER); + } + + public Node evalNode(String
spark git commit: [SPARK-16245][ML] model loading backward compatibility for ml.feature.PCA
Repository: spark Updated Branches: refs/heads/branch-2.0 dd70a115c -> 22b4072e7 [SPARK-16245][ML] model loading backward compatibility for ml.feature.PCA ## What changes were proposed in this pull request? model loading backward compatibility for ml.feature.PCA. ## How was this patch tested? existing ut and manual test for loading models saved by Spark 1.6. Author: Yanbo LiangCloses #13937 from yanboliang/spark-16245. (cherry picked from commit 0df5ce1bc1387a58b33cd185008f4022bd3dcc69) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22b4072e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22b4072e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22b4072e Branch: refs/heads/branch-2.0 Commit: 22b4072e704f9a68a605e9a4cebf54d2122fe448 Parents: dd70a11 Author: Yanbo Liang Authored: Tue Jun 28 19:53:07 2016 -0700 Committer: Xiangrui Meng Committed: Tue Jun 28 19:53:16 2016 -0700 -- .../scala/org/apache/spark/ml/feature/PCA.scala | 18 -- 1 file changed, 8 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/22b4072e/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 72167b5..ef8b085 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -206,24 +206,22 @@ object PCAModel extends MLReadable[PCAModel] { override def load(path: String): PCAModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) - // explainedVariance field is not present in Spark <= 1.6 - val versionRegex = "([0-9]+)\\.([0-9]+).*".r - val hasExplainedVariance = metadata.sparkVersion match { -case versionRegex(major, minor) => - major.toInt >= 2 || (major.toInt == 1 && minor.toInt > 6) -case _ => false - } + val versionRegex = "([0-9]+)\\.(.+)".r + val versionRegex(major, _) = metadata.sparkVersion val dataPath = new Path(path, "data").toString - val model = if (hasExplainedVariance) { + val model = if (major.toInt >= 2) { val Row(pc: DenseMatrix, explainedVariance: DenseVector) = sparkSession.read.parquet(dataPath) .select("pc", "explainedVariance") .head() new PCAModel(metadata.uid, pc, explainedVariance) } else { -val Row(pc: DenseMatrix) = sparkSession.read.parquet(dataPath).select("pc").head() -new PCAModel(metadata.uid, pc, Vectors.dense(Array.empty[Double]).asInstanceOf[DenseVector]) +// pc field is the old matrix format in Spark <= 1.6 +// explainedVariance field is not present in Spark <= 1.6 +val Row(pc: OldDenseMatrix) = sparkSession.read.parquet(dataPath).select("pc").head() +new PCAModel(metadata.uid, pc.asML, + Vectors.dense(Array.empty[Double]).asInstanceOf[DenseVector]) } DefaultParamsReader.getAndSetParams(model, metadata) model - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16245][ML] model loading backward compatibility for ml.feature.PCA
Repository: spark Updated Branches: refs/heads/master 363bcedee -> 0df5ce1bc [SPARK-16245][ML] model loading backward compatibility for ml.feature.PCA ## What changes were proposed in this pull request? model loading backward compatibility for ml.feature.PCA. ## How was this patch tested? existing ut and manual test for loading models saved by Spark 1.6. Author: Yanbo LiangCloses #13937 from yanboliang/spark-16245. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df5ce1b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df5ce1b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df5ce1b Branch: refs/heads/master Commit: 0df5ce1bc1387a58b33cd185008f4022bd3dcc69 Parents: 363bced Author: Yanbo Liang Authored: Tue Jun 28 19:53:07 2016 -0700 Committer: Xiangrui Meng Committed: Tue Jun 28 19:53:07 2016 -0700 -- .../scala/org/apache/spark/ml/feature/PCA.scala | 18 -- 1 file changed, 8 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0df5ce1b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 72167b5..ef8b085 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -206,24 +206,22 @@ object PCAModel extends MLReadable[PCAModel] { override def load(path: String): PCAModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) - // explainedVariance field is not present in Spark <= 1.6 - val versionRegex = "([0-9]+)\\.([0-9]+).*".r - val hasExplainedVariance = metadata.sparkVersion match { -case versionRegex(major, minor) => - major.toInt >= 2 || (major.toInt == 1 && minor.toInt > 6) -case _ => false - } + val versionRegex = "([0-9]+)\\.(.+)".r + val versionRegex(major, _) = metadata.sparkVersion val dataPath = new Path(path, "data").toString - val model = if (hasExplainedVariance) { + val model = if (major.toInt >= 2) { val Row(pc: DenseMatrix, explainedVariance: DenseVector) = sparkSession.read.parquet(dataPath) .select("pc", "explainedVariance") .head() new PCAModel(metadata.uid, pc, explainedVariance) } else { -val Row(pc: DenseMatrix) = sparkSession.read.parquet(dataPath).select("pc").head() -new PCAModel(metadata.uid, pc, Vectors.dense(Array.empty[Double]).asInstanceOf[DenseVector]) +// pc field is the old matrix format in Spark <= 1.6 +// explainedVariance field is not present in Spark <= 1.6 +val Row(pc: OldDenseMatrix) = sparkSession.read.parquet(dataPath).select("pc").head() +new PCAModel(metadata.uid, pc.asML, + Vectors.dense(Array.empty[Double]).asInstanceOf[DenseVector]) } DefaultParamsReader.getAndSetParams(model, metadata) model - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16248][SQL] Whitelist the list of Hive fallback functions
Repository: spark Updated Branches: refs/heads/branch-2.0 835c5a3bd -> dd70a115c [SPARK-16248][SQL] Whitelist the list of Hive fallback functions ## What changes were proposed in this pull request? This patch removes the blind fallback into Hive for functions. Instead, it creates a whitelist and adds only a small number of functions to the whitelist, i.e. the ones we intend to support in the long run in Spark. ## How was this patch tested? Updated tests to reflect the change. Author: Reynold XinCloses #13939 from rxin/hive-whitelist. (cherry picked from commit 363bcedeea40fe3f1a92271b96af2acba63e058c) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd70a115 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd70a115 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd70a115 Branch: refs/heads/branch-2.0 Commit: dd70a115cd562223e97f0b5e6172a9ea758be95d Parents: 835c5a3 Author: Reynold Xin Authored: Tue Jun 28 19:36:53 2016 -0700 Committer: Reynold Xin Committed: Tue Jun 28 19:36:59 2016 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../hive/execution/HiveCompatibilitySuite.scala | 22 +- .../HiveWindowFunctionQuerySuite.scala | 25 .../spark/sql/hive/HiveSessionCatalog.scala | 42 +--- 4 files changed, 40 insertions(+), 50 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dd70a115/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 42a8faa..0bde48c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -248,6 +248,7 @@ object FunctionRegistry { expression[Average]("mean"), expression[Min]("min"), expression[Skewness]("skewness"), +expression[StddevSamp]("std"), expression[StddevSamp]("stddev"), expression[StddevPop]("stddev_pop"), expression[StddevSamp]("stddev_samp"), http://git-wip-us.apache.org/repos/asf/spark/blob/dd70a115/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala -- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 2d5a970..13d18fd 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -517,6 +517,18 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // This test uses CREATE EXTERNAL TABLE without specifying LOCATION "alter2", +// [SPARK-16248][SQL] Whitelist the list of Hive fallback functions +"udf_field", +"udf_reflect2", +"udf_xpath", +"udf_xpath_boolean", +"udf_xpath_double", +"udf_xpath_float", +"udf_xpath_int", +"udf_xpath_long", +"udf_xpath_short", +"udf_xpath_string", + // These tests DROP TABLE that don't exist (but do not specify IF EXISTS) "alter_rename_partition1", "date_1", @@ -1004,7 +1016,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_elt", "udf_equal", "udf_exp", -"udf_field", "udf_find_in_set", "udf_float", "udf_floor", @@ -1049,7 +1060,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_power", "udf_radians", "udf_rand", -"udf_reflect2", "udf_regexp", "udf_regexp_extract", "udf_regexp_replace", @@ -1090,14 +1100,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_variance", "udf_weekofyear", "udf_when", -"udf_xpath", -"udf_xpath_boolean", -"udf_xpath_double", -"udf_xpath_float", -"udf_xpath_int", -"udf_xpath_long", -"udf_xpath_short", -"udf_xpath_string", "union10", "union11", "union13", http://git-wip-us.apache.org/repos/asf/spark/blob/dd70a115/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
spark git commit: [SPARK-16248][SQL] Whitelist the list of Hive fallback functions
Repository: spark Updated Branches: refs/heads/master 5bf8881b3 -> 363bcedee [SPARK-16248][SQL] Whitelist the list of Hive fallback functions ## What changes were proposed in this pull request? This patch removes the blind fallback into Hive for functions. Instead, it creates a whitelist and adds only a small number of functions to the whitelist, i.e. the ones we intend to support in the long run in Spark. ## How was this patch tested? Updated tests to reflect the change. Author: Reynold XinCloses #13939 from rxin/hive-whitelist. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/363bcede Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/363bcede Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/363bcede Branch: refs/heads/master Commit: 363bcedeea40fe3f1a92271b96af2acba63e058c Parents: 5bf8881 Author: Reynold Xin Authored: Tue Jun 28 19:36:53 2016 -0700 Committer: Reynold Xin Committed: Tue Jun 28 19:36:53 2016 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../hive/execution/HiveCompatibilitySuite.scala | 22 +- .../HiveWindowFunctionQuerySuite.scala | 25 .../spark/sql/hive/HiveSessionCatalog.scala | 42 +--- 4 files changed, 40 insertions(+), 50 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/363bcede/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 42a8faa..0bde48c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -248,6 +248,7 @@ object FunctionRegistry { expression[Average]("mean"), expression[Min]("min"), expression[Skewness]("skewness"), +expression[StddevSamp]("std"), expression[StddevSamp]("stddev"), expression[StddevPop]("stddev_pop"), expression[StddevSamp]("stddev_samp"), http://git-wip-us.apache.org/repos/asf/spark/blob/363bcede/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala -- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 2d5a970..13d18fd 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -517,6 +517,18 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // This test uses CREATE EXTERNAL TABLE without specifying LOCATION "alter2", +// [SPARK-16248][SQL] Whitelist the list of Hive fallback functions +"udf_field", +"udf_reflect2", +"udf_xpath", +"udf_xpath_boolean", +"udf_xpath_double", +"udf_xpath_float", +"udf_xpath_int", +"udf_xpath_long", +"udf_xpath_short", +"udf_xpath_string", + // These tests DROP TABLE that don't exist (but do not specify IF EXISTS) "alter_rename_partition1", "date_1", @@ -1004,7 +1016,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_elt", "udf_equal", "udf_exp", -"udf_field", "udf_find_in_set", "udf_float", "udf_floor", @@ -1049,7 +1060,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_power", "udf_radians", "udf_rand", -"udf_reflect2", "udf_regexp", "udf_regexp_extract", "udf_regexp_replace", @@ -1090,14 +1100,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_variance", "udf_weekofyear", "udf_when", -"udf_xpath", -"udf_xpath_boolean", -"udf_xpath_double", -"udf_xpath_float", -"udf_xpath_int", -"udf_xpath_long", -"udf_xpath_short", -"udf_xpath_string", "union10", "union11", "union13", http://git-wip-us.apache.org/repos/asf/spark/blob/363bcede/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala -- diff --git
spark git commit: [SPARK-16268][PYSPARK] SQLContext should import DataStreamReader
Repository: spark Updated Branches: refs/heads/branch-2.0 d7a59f1f4 -> 835c5a3bd [SPARK-16268][PYSPARK] SQLContext should import DataStreamReader ## What changes were proposed in this pull request? Fixed the following error: ``` >>> sqlContext.readStream Traceback (most recent call last): File "", line 1, in File "...", line 442, in readStream return DataStreamReader(self._wrapped) NameError: global name 'DataStreamReader' is not defined ``` ## How was this patch tested? The added test. Author: Shixiong ZhuCloses #13958 from zsxwing/fix-import. (cherry picked from commit 5bf8881b34a18f25acc10aeb28a06af4c44a6ac8) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/835c5a3b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/835c5a3b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/835c5a3b Branch: refs/heads/branch-2.0 Commit: 835c5a3bd549811178f5b455dc127401c5551866 Parents: d7a59f1 Author: Shixiong Zhu Authored: Tue Jun 28 18:33:37 2016 -0700 Committer: Tathagata Das Committed: Tue Jun 28 18:33:52 2016 -0700 -- python/pyspark/sql/context.py | 11 +-- 1 file changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/835c5a3b/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 8a1a874..b5dde13 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -26,7 +26,7 @@ from pyspark import since from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.session import _monkey_patch_RDD, SparkSession from pyspark.sql.dataframe import DataFrame -from pyspark.sql.readwriter import DataFrameReader +from pyspark.sql.readwriter import DataFrameReader, DataStreamReader from pyspark.sql.types import Row, StringType from pyspark.sql.utils import install_exception_handler @@ -438,8 +438,12 @@ class SQLContext(object): .. note:: Experimental. :return: :class:`DataStreamReader` + +>>> text_sdf = sqlContext.readStream.text(os.path.join(tempfile.mkdtemp(), 'data')) +>>> text_sdf.isStreaming +True """ -return DataStreamReader(self._wrapped) +return DataStreamReader(self) @property @since(2.0) @@ -515,6 +519,7 @@ class UDFRegistration(object): def _test(): import os import doctest +import tempfile from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext import pyspark.sql.context @@ -523,6 +528,8 @@ def _test(): globs = pyspark.sql.context.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') +globs['tempfile'] = tempfile +globs['os'] = os globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['rdd'] = rdd = sc.parallelize( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16268][PYSPARK] SQLContext should import DataStreamReader
Repository: spark Updated Branches: refs/heads/master 823518c2b -> 5bf8881b3 [SPARK-16268][PYSPARK] SQLContext should import DataStreamReader ## What changes were proposed in this pull request? Fixed the following error: ``` >>> sqlContext.readStream Traceback (most recent call last): File "", line 1, in File "...", line 442, in readStream return DataStreamReader(self._wrapped) NameError: global name 'DataStreamReader' is not defined ``` ## How was this patch tested? The added test. Author: Shixiong ZhuCloses #13958 from zsxwing/fix-import. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bf8881b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bf8881b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bf8881b Branch: refs/heads/master Commit: 5bf8881b34a18f25acc10aeb28a06af4c44a6ac8 Parents: 823518c Author: Shixiong Zhu Authored: Tue Jun 28 18:33:37 2016 -0700 Committer: Tathagata Das Committed: Tue Jun 28 18:33:37 2016 -0700 -- python/pyspark/sql/context.py | 11 +-- 1 file changed, 9 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5bf8881b/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 8a1a874..b5dde13 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -26,7 +26,7 @@ from pyspark import since from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.session import _monkey_patch_RDD, SparkSession from pyspark.sql.dataframe import DataFrame -from pyspark.sql.readwriter import DataFrameReader +from pyspark.sql.readwriter import DataFrameReader, DataStreamReader from pyspark.sql.types import Row, StringType from pyspark.sql.utils import install_exception_handler @@ -438,8 +438,12 @@ class SQLContext(object): .. note:: Experimental. :return: :class:`DataStreamReader` + +>>> text_sdf = sqlContext.readStream.text(os.path.join(tempfile.mkdtemp(), 'data')) +>>> text_sdf.isStreaming +True """ -return DataStreamReader(self._wrapped) +return DataStreamReader(self) @property @since(2.0) @@ -515,6 +519,7 @@ class UDFRegistration(object): def _test(): import os import doctest +import tempfile from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext import pyspark.sql.context @@ -523,6 +528,8 @@ def _test(): globs = pyspark.sql.context.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') +globs['tempfile'] = tempfile +globs['os'] = os globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['rdd'] = rdd = sc.parallelize( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARKR] add csv tests
Repository: spark Updated Branches: refs/heads/branch-2.0 52c9d69f7 -> d7a59f1f4 [SPARKR] add csv tests ## What changes were proposed in this pull request? Add unit tests for csv data for SPARKR ## How was this patch tested? unit tests Author: Felix CheungCloses #13904 from felixcheung/rcsv. (cherry picked from commit 823518c2b5259c8a954431467639198c808c9198) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7a59f1f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7a59f1f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7a59f1f Branch: refs/heads/branch-2.0 Commit: d7a59f1f450aae06baac96867a26042bd1ccd1d5 Parents: 52c9d69 Author: Felix Cheung Authored: Tue Jun 28 17:08:28 2016 -0700 Committer: Shivaram Venkataraman Committed: Tue Jun 28 17:08:36 2016 -0700 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 18 ++ 1 file changed, 18 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7a59f1f/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 74def5c..deda1b6 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -208,6 +208,24 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("read csv as DataFrame", { + csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") + mockLinesCsv <- c("year,make,model,comment,blank", + "\"2012\",\"Tesla\",\"S\",\"No comment\",", + "1997,Ford,E350,\"Go get one now they are going fast\",", + "2015,Chevy,Volt") + writeLines(mockLinesCsv, csvPath) + + # default "header" is false + df <- read.df(csvPath, "csv", header = "true") + expect_equal(count(df), 3) + expect_equal(columns(df), c("year", "make", "model", "comment", "blank")) + expect_equal(sort(unlist(collect(where(df, df$year == "2015", + sort(unlist(list(year = "2015", make = "Chevy", model = "Volt" + + unlink(csvPath) +}) + test_that("convert NAs to null type in DataFrames", { rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L))) df <- createDataFrame(rdd, list("a", "b")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARKR] add csv tests
Repository: spark Updated Branches: refs/heads/master 5545b7910 -> 823518c2b [SPARKR] add csv tests ## What changes were proposed in this pull request? Add unit tests for csv data for SPARKR ## How was this patch tested? unit tests Author: Felix CheungCloses #13904 from felixcheung/rcsv. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/823518c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/823518c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/823518c2 Branch: refs/heads/master Commit: 823518c2b5259c8a954431467639198c808c9198 Parents: 5545b79 Author: Felix Cheung Authored: Tue Jun 28 17:08:28 2016 -0700 Committer: Shivaram Venkataraman Committed: Tue Jun 28 17:08:28 2016 -0700 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 18 ++ 1 file changed, 18 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/823518c2/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 7562fa9..d4662ad 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -208,6 +208,24 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("read csv as DataFrame", { + csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") + mockLinesCsv <- c("year,make,model,comment,blank", + "\"2012\",\"Tesla\",\"S\",\"No comment\",", + "1997,Ford,E350,\"Go get one now they are going fast\",", + "2015,Chevy,Volt") + writeLines(mockLinesCsv, csvPath) + + # default "header" is false + df <- read.df(csvPath, "csv", header = "true") + expect_equal(count(df), 3) + expect_equal(columns(df), c("year", "make", "model", "comment", "blank")) + expect_equal(sort(unlist(collect(where(df, df$year == "2015", + sort(unlist(list(year = "2015", make = "Chevy", model = "Volt" + + unlink(csvPath) +}) + test_that("convert NAs to null type in DataFrames", { rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L))) df <- createDataFrame(rdd, list("a", "b")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][DOCS][STRUCTURED STREAMING] Minor doc fixes around `DataFrameWriter` and `DataStreamWriter`
Repository: spark Updated Branches: refs/heads/branch-2.0 5fb7804e5 -> 52c9d69f7 [MINOR][DOCS][STRUCTURED STREAMING] Minor doc fixes around `DataFrameWriter` and `DataStreamWriter` ## What changes were proposed in this pull request? Fixes a couple old references to `DataFrameWriter.startStream` to `DataStreamWriter.start Author: Burak YavuzCloses #13952 from brkyvz/minor-doc-fix. (cherry picked from commit 5545b791096756b07b3207fb3de13b68b9a37b00) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52c9d69f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52c9d69f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52c9d69f Branch: refs/heads/branch-2.0 Commit: 52c9d69f7da05c45cb191fef8f7ce54c8f40b1bb Parents: 5fb7804 Author: Burak Yavuz Authored: Tue Jun 28 17:02:16 2016 -0700 Committer: Shixiong Zhu Committed: Tue Jun 28 17:02:23 2016 -0700 -- python/pyspark/sql/dataframe.py| 4 ++-- .../sql/catalyst/analysis/UnsupportedOperationChecker.scala| 6 +++--- .../sql/catalyst/analysis/UnsupportedOperationsSuite.scala | 6 +++--- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 2 +- .../scala/org/apache/spark/sql/streaming/StreamingQuery.scala | 4 ++-- .../apache/spark/sql/streaming/StreamingQueryListener.scala| 4 ++-- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 4 ++-- 8 files changed, 17 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52c9d69f/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index acf9d08..c8c8e7d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -257,8 +257,8 @@ class DataFrame(object): def isStreaming(self): """Returns true if this :class:`Dataset` contains one or more sources that continuously return data as it arrives. A :class:`Dataset` that reads data from a streaming source -must be executed as a :class:`StreamingQuery` using the :func:`startStream` method in -:class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or +must be executed as a :class:`StreamingQuery` using the :func:`start` method in +:class:`DataStreamWriter`. Methods that return a single answer, (e.g., :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming source present. http://git-wip-us.apache.org/repos/asf/spark/blob/52c9d69f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 689e016..f6e32e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -30,7 +30,7 @@ object UnsupportedOperationChecker { def checkForBatch(plan: LogicalPlan): Unit = { plan.foreachUp { case p if p.isStreaming => -throwError("Queries with streaming sources must be executed with write.startStream()")(p) +throwError("Queries with streaming sources must be executed with writeStream.start()")(p) case _ => } @@ -40,7 +40,7 @@ object UnsupportedOperationChecker { if (!plan.isStreaming) { throwError( -"Queries without streaming sources cannot be executed with write.startStream()")(plan) +"Queries without streaming sources cannot be executed with writeStream.start()")(plan) } // Disallow multiple streaming aggregations @@ -154,7 +154,7 @@ object UnsupportedOperationChecker { case ReturnAnswer(child) if child.isStreaming => throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " + -"with streaming DataFrames/Datasets must be executed with write.startStream().") +"with streaming DataFrames/Datasets must be executed with writeStream.start().") case _ => }
spark git commit: [MINOR][DOCS][STRUCTURED STREAMING] Minor doc fixes around `DataFrameWriter` and `DataStreamWriter`
Repository: spark Updated Branches: refs/heads/master 3554713a1 -> 5545b7910 [MINOR][DOCS][STRUCTURED STREAMING] Minor doc fixes around `DataFrameWriter` and `DataStreamWriter` ## What changes were proposed in this pull request? Fixes a couple old references to `DataFrameWriter.startStream` to `DataStreamWriter.start Author: Burak YavuzCloses #13952 from brkyvz/minor-doc-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5545b791 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5545b791 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5545b791 Branch: refs/heads/master Commit: 5545b791096756b07b3207fb3de13b68b9a37b00 Parents: 3554713 Author: Burak Yavuz Authored: Tue Jun 28 17:02:16 2016 -0700 Committer: Shixiong Zhu Committed: Tue Jun 28 17:02:16 2016 -0700 -- python/pyspark/sql/dataframe.py| 4 ++-- .../sql/catalyst/analysis/UnsupportedOperationChecker.scala| 6 +++--- .../sql/catalyst/analysis/UnsupportedOperationsSuite.scala | 6 +++--- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- .../org/apache/spark/sql/streaming/DataStreamWriter.scala | 2 +- .../scala/org/apache/spark/sql/streaming/StreamingQuery.scala | 4 ++-- .../apache/spark/sql/streaming/StreamingQueryListener.scala| 4 ++-- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 4 ++-- 8 files changed, 17 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5545b791/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a2443ed..4f13307 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -257,8 +257,8 @@ class DataFrame(object): def isStreaming(self): """Returns true if this :class:`Dataset` contains one or more sources that continuously return data as it arrives. A :class:`Dataset` that reads data from a streaming source -must be executed as a :class:`StreamingQuery` using the :func:`startStream` method in -:class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or +must be executed as a :class:`StreamingQuery` using the :func:`start` method in +:class:`DataStreamWriter`. Methods that return a single answer, (e.g., :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming source present. http://git-wip-us.apache.org/repos/asf/spark/blob/5545b791/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 689e016..f6e32e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -30,7 +30,7 @@ object UnsupportedOperationChecker { def checkForBatch(plan: LogicalPlan): Unit = { plan.foreachUp { case p if p.isStreaming => -throwError("Queries with streaming sources must be executed with write.startStream()")(p) +throwError("Queries with streaming sources must be executed with writeStream.start()")(p) case _ => } @@ -40,7 +40,7 @@ object UnsupportedOperationChecker { if (!plan.isStreaming) { throwError( -"Queries without streaming sources cannot be executed with write.startStream()")(plan) +"Queries without streaming sources cannot be executed with writeStream.start()")(plan) } // Disallow multiple streaming aggregations @@ -154,7 +154,7 @@ object UnsupportedOperationChecker { case ReturnAnswer(child) if child.isStreaming => throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " + -"with streaming DataFrames/Datasets must be executed with write.startStream().") +"with streaming DataFrames/Datasets must be executed with writeStream.start().") case _ => } http://git-wip-us.apache.org/repos/asf/spark/blob/5545b791/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala -- diff --git
spark git commit: [SPARK-16114][SQL] structured streaming network word count examples
Repository: spark Updated Branches: refs/heads/branch-2.0 d73c38ed0 -> 5fb7804e5 [SPARK-16114][SQL] structured streaming network word count examples ## What changes were proposed in this pull request? Network word count example for structured streaming ## How was this patch tested? Run locally Author: James ThomasAuthor: James Thomas Closes #13816 from jjthomas/master. (cherry picked from commit 3554713a163c58ca176ffde87d2c6e4a91bacb50) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fb7804e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fb7804e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fb7804e Branch: refs/heads/branch-2.0 Commit: 5fb7804e55e50ba61c3a780b771d9b20b0bf2424 Parents: d73c38e Author: James Thomas Authored: Tue Jun 28 16:12:48 2016 -0700 Committer: Tathagata Das Committed: Tue Jun 28 16:13:00 2016 -0700 -- .../JavaStructuredNetworkWordCount.java | 82 .../streaming/structured_network_wordcount.py | 76 ++ .../streaming/StructuredNetworkWordCount.scala | 76 ++ 3 files changed, 234 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5fb7804e/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java new file mode 100644 index 000..a2cf938 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql.streaming; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.streaming.StreamingQuery; + +import java.util.Arrays; +import java.util.Iterator; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * + * Usage: JavaStructuredNetworkWordCount + * and describe the TCP server that Structured Streaming + * would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + *`$ nc -lk ` + * and then run the example + *`$ bin/run-example sql.streaming.JavaStructuredNetworkWordCount + *localhost ` + */ +public final class JavaStructuredNetworkWordCount { + + public static void main(String[] args) throws Exception { +if (args.length < 2) { + System.err.println("Usage: JavaNetworkWordCount "); + System.exit(1); +} + +String host = args[0]; +int port = Integer.parseInt(args[1]); + +SparkSession spark = SparkSession + .builder() + .appName("JavaStructuredNetworkWordCount") + .getOrCreate(); + +// Create DataFrame representing the stream of input lines from connection to host:port +Dataset lines = spark + .readStream() + .format("socket") + .option("host", host) + .option("port", port) + .load().as(Encoders.STRING()); + +// Split the lines into words +Dataset words = lines.flatMap(new FlatMapFunction () { + @Override + public Iterator call(String x) { +return Arrays.asList(x.split(" ")).iterator(); + } +}, Encoders.STRING()); + +// Generate running word count +Dataset wordCounts = words.groupBy("value").count(); + +// Start running the query that prints the running counts to the console +StreamingQuery query = wordCounts.writeStream() + .outputMode("complete") + .format("console") + .start(); + +
spark git commit: [SPARK-16114][SQL] structured streaming network word count examples
Repository: spark Updated Branches: refs/heads/master 8a977b065 -> 3554713a1 [SPARK-16114][SQL] structured streaming network word count examples ## What changes were proposed in this pull request? Network word count example for structured streaming ## How was this patch tested? Run locally Author: James ThomasAuthor: James Thomas Closes #13816 from jjthomas/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3554713a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3554713a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3554713a Branch: refs/heads/master Commit: 3554713a163c58ca176ffde87d2c6e4a91bacb50 Parents: 8a977b0 Author: James Thomas Authored: Tue Jun 28 16:12:48 2016 -0700 Committer: Tathagata Das Committed: Tue Jun 28 16:12:48 2016 -0700 -- .../JavaStructuredNetworkWordCount.java | 82 .../streaming/structured_network_wordcount.py | 76 ++ .../streaming/StructuredNetworkWordCount.scala | 76 ++ 3 files changed, 234 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3554713a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java new file mode 100644 index 000..a2cf938 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.examples.sql.streaming; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.streaming.StreamingQuery; + +import java.util.Arrays; +import java.util.Iterator; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * + * Usage: JavaStructuredNetworkWordCount + * and describe the TCP server that Structured Streaming + * would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + *`$ nc -lk ` + * and then run the example + *`$ bin/run-example sql.streaming.JavaStructuredNetworkWordCount + *localhost ` + */ +public final class JavaStructuredNetworkWordCount { + + public static void main(String[] args) throws Exception { +if (args.length < 2) { + System.err.println("Usage: JavaNetworkWordCount "); + System.exit(1); +} + +String host = args[0]; +int port = Integer.parseInt(args[1]); + +SparkSession spark = SparkSession + .builder() + .appName("JavaStructuredNetworkWordCount") + .getOrCreate(); + +// Create DataFrame representing the stream of input lines from connection to host:port +Dataset lines = spark + .readStream() + .format("socket") + .option("host", host) + .option("port", port) + .load().as(Encoders.STRING()); + +// Split the lines into words +Dataset words = lines.flatMap(new FlatMapFunction () { + @Override + public Iterator call(String x) { +return Arrays.asList(x.split(" ")).iterator(); + } +}, Encoders.STRING()); + +// Generate running word count +Dataset wordCounts = words.groupBy("value").count(); + +// Start running the query that prints the running counts to the console +StreamingQuery query = wordCounts.writeStream() + .outputMode("complete") + .format("console") + .start(); + +query.awaitTermination(); + } +}
spark git commit: [SPARK-16100][SQL] fix bug when use Map as the buffer type of Aggregator
Repository: spark Updated Branches: refs/heads/branch-2.0 5626a0af5 -> d73c38ed0 [SPARK-16100][SQL] fix bug when use Map as the buffer type of Aggregator ## What changes were proposed in this pull request? The root cause is in `MapObjects`. Its parameter `loopVar` is not declared as child, but sometimes can be same with `lambdaFunction`(e.g. the function that takes `loopVar` and produces `lambdaFunction` may be `identity`), which is a child. This brings trouble when call `withNewChildren`, it may mistakenly treat `loopVar` as a child and cause `IndexOutOfBoundsException: 0` later. This PR fixes this bug by simply pulling out the paremters from `LambdaVariable` and pass them to `MapObjects` directly. ## How was this patch tested? new test in `DatasetAggregatorSuite` Author: Wenchen FanCloses #13835 from cloud-fan/map-objects. (cherry picked from commit 8a977b065418f07d2bf4fe1607a5534c32d04c47) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d73c38ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d73c38ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d73c38ed Branch: refs/heads/branch-2.0 Commit: d73c38ed0e129bdcb634000153516fca4b31b9d0 Parents: 5626a0a Author: Wenchen Fan Authored: Wed Jun 29 06:39:28 2016 +0800 Committer: Cheng Lian Committed: Wed Jun 29 06:40:05 2016 +0800 -- .../catalyst/expressions/objects/objects.scala | 28 .../spark/sql/DatasetAggregatorSuite.scala | 15 +++ 2 files changed, 32 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d73c38ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index c597a2a..ea4dee1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -353,7 +353,7 @@ object MapObjects { val loopValue = "MapObjects_loopValue" + curId.getAndIncrement() val loopIsNull = "MapObjects_loopIsNull" + curId.getAndIncrement() val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) -MapObjects(loopVar, function(loopVar), inputData) +MapObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData) } } @@ -365,14 +365,20 @@ object MapObjects { * The following collection ObjectTypes are currently supported: * Seq, Array, ArrayData, java.util.List * - * @param loopVar A place holder that used as the loop variable when iterate the collection, and - *used as input for the `lambdaFunction`. It also carries the element type info. + * @param loopValue the name of the loop variable that used when iterate the collection, and used + * as input for the `lambdaFunction` + * @param loopIsNull the nullity of the loop variable that used when iterate the collection, and + * used as input for the `lambdaFunction` + * @param loopVarDataType the data type of the loop variable that used when iterate the collection, + *and used as input for the `lambdaFunction` * @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function * to handle collection elements. * @param inputData An expression that when evaluated returns a collection object. */ case class MapObjects private( -loopVar: LambdaVariable, +loopValue: String, +loopIsNull: String, +loopVarDataType: DataType, lambdaFunction: Expression, inputData: Expression) extends Expression with NonSQLExpression { @@ -386,9 +392,9 @@ case class MapObjects private( override def dataType: DataType = ArrayType(lambdaFunction.dataType) override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val elementJavaType = ctx.javaType(loopVar.dataType) -ctx.addMutableState("boolean", loopVar.isNull, "") -ctx.addMutableState(elementJavaType, loopVar.value, "") +val elementJavaType = ctx.javaType(loopVarDataType) +ctx.addMutableState("boolean", loopIsNull, "") +ctx.addMutableState(elementJavaType, loopValue, "") val genInputData = inputData.genCode(ctx) val genFunction = lambdaFunction.genCode(ctx) val dataLength = ctx.freshName("dataLength") @@ -443,11 +449,11 @@ case class MapObjects private(
spark git commit: [SPARK-16100][SQL] fix bug when use Map as the buffer type of Aggregator
Repository: spark Updated Branches: refs/heads/master 25520e976 -> 8a977b065 [SPARK-16100][SQL] fix bug when use Map as the buffer type of Aggregator ## What changes were proposed in this pull request? The root cause is in `MapObjects`. Its parameter `loopVar` is not declared as child, but sometimes can be same with `lambdaFunction`(e.g. the function that takes `loopVar` and produces `lambdaFunction` may be `identity`), which is a child. This brings trouble when call `withNewChildren`, it may mistakenly treat `loopVar` as a child and cause `IndexOutOfBoundsException: 0` later. This PR fixes this bug by simply pulling out the paremters from `LambdaVariable` and pass them to `MapObjects` directly. ## How was this patch tested? new test in `DatasetAggregatorSuite` Author: Wenchen FanCloses #13835 from cloud-fan/map-objects. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a977b06 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a977b06 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a977b06 Branch: refs/heads/master Commit: 8a977b065418f07d2bf4fe1607a5534c32d04c47 Parents: 25520e9 Author: Wenchen Fan Authored: Wed Jun 29 06:39:28 2016 +0800 Committer: Cheng Lian Committed: Wed Jun 29 06:39:28 2016 +0800 -- .../catalyst/expressions/objects/objects.scala | 28 .../spark/sql/DatasetAggregatorSuite.scala | 15 +++ 2 files changed, 32 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8a977b06/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index c597a2a..ea4dee1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -353,7 +353,7 @@ object MapObjects { val loopValue = "MapObjects_loopValue" + curId.getAndIncrement() val loopIsNull = "MapObjects_loopIsNull" + curId.getAndIncrement() val loopVar = LambdaVariable(loopValue, loopIsNull, elementType) -MapObjects(loopVar, function(loopVar), inputData) +MapObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData) } } @@ -365,14 +365,20 @@ object MapObjects { * The following collection ObjectTypes are currently supported: * Seq, Array, ArrayData, java.util.List * - * @param loopVar A place holder that used as the loop variable when iterate the collection, and - *used as input for the `lambdaFunction`. It also carries the element type info. + * @param loopValue the name of the loop variable that used when iterate the collection, and used + * as input for the `lambdaFunction` + * @param loopIsNull the nullity of the loop variable that used when iterate the collection, and + * used as input for the `lambdaFunction` + * @param loopVarDataType the data type of the loop variable that used when iterate the collection, + *and used as input for the `lambdaFunction` * @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function * to handle collection elements. * @param inputData An expression that when evaluated returns a collection object. */ case class MapObjects private( -loopVar: LambdaVariable, +loopValue: String, +loopIsNull: String, +loopVarDataType: DataType, lambdaFunction: Expression, inputData: Expression) extends Expression with NonSQLExpression { @@ -386,9 +392,9 @@ case class MapObjects private( override def dataType: DataType = ArrayType(lambdaFunction.dataType) override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val elementJavaType = ctx.javaType(loopVar.dataType) -ctx.addMutableState("boolean", loopVar.isNull, "") -ctx.addMutableState(elementJavaType, loopVar.value, "") +val elementJavaType = ctx.javaType(loopVarDataType) +ctx.addMutableState("boolean", loopIsNull, "") +ctx.addMutableState(elementJavaType, loopValue, "") val genInputData = inputData.genCode(ctx) val genFunction = lambdaFunction.genCode(ctx) val dataLength = ctx.freshName("dataLength") @@ -443,11 +449,11 @@ case class MapObjects private( } val loopNullCheck = inputData.dataType match { - case _: ArrayType => s"${loopVar.isNull} =
spark git commit: [SPARK-16236][SQL] Add Path Option back to Load API in DataFrameReader
Repository: spark Updated Branches: refs/heads/branch-2.0 43bd612f3 -> 5626a0af5 [SPARK-16236][SQL] Add Path Option back to Load API in DataFrameReader What changes were proposed in this pull request? koertkuipers identified the PR https://github.com/apache/spark/pull/13727/ changed the behavior of `load` API. After the change, the `load` API does not add the value of `path` into the `options`. Thank you! This PR is to add the option `path` back to `load()` API in `DataFrameReader`, if and only if users specify one and only one `path` in the `load` API. For example, users can see the `path` option after the following API call, ```Scala spark.read .format("parquet") .load("/test") ``` How was this patch tested? Added test cases. Author: gatorsmileCloses #13933 from gatorsmile/optionPath. (cherry picked from commit 25520e976275e0d1e3bf9c73128ef4dec4618568) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5626a0af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5626a0af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5626a0af Branch: refs/heads/branch-2.0 Commit: 5626a0af598168a15d68a8817d1dec2a0e3dec7e Parents: 43bd612 Author: gatorsmile Authored: Tue Jun 28 15:32:45 2016 -0700 Committer: Reynold Xin Committed: Tue Jun 28 15:32:52 2016 -0700 -- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 28 2 files changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5626a0af/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 35ba9c5..35ba522 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -129,7 +129,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(path: String): DataFrame = { -load(Seq(path): _*) // force invocation of `load(...varargs...)` +option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)` } /** http://git-wip-us.apache.org/repos/asf/spark/blob/5626a0af/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 3fa3864..ebbcc1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -170,6 +170,34 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be assert(LastOptions.saveMode === SaveMode.ErrorIfExists) } + test("test path option in load") { +spark.read + .format("org.apache.spark.sql.test") + .option("intOpt", 56) + .load("/test") + +assert(LastOptions.parameters("intOpt") == "56") +assert(LastOptions.parameters("path") == "/test") + +LastOptions.clear() +spark.read + .format("org.apache.spark.sql.test") + .option("intOpt", 55) + .load() + +assert(LastOptions.parameters("intOpt") == "55") +assert(!LastOptions.parameters.contains("path")) + +LastOptions.clear() +spark.read + .format("org.apache.spark.sql.test") + .option("intOpt", 54) + .load("/test", "/test1", "/test2") + +assert(LastOptions.parameters("intOpt") == "54") +assert(!LastOptions.parameters.contains("path")) + } + test("test different data types for options") { val df = spark.read .format("org.apache.spark.sql.test") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16236][SQL] Add Path Option back to Load API in DataFrameReader
Repository: spark Updated Branches: refs/heads/master 35438fb0a -> 25520e976 [SPARK-16236][SQL] Add Path Option back to Load API in DataFrameReader What changes were proposed in this pull request? koertkuipers identified the PR https://github.com/apache/spark/pull/13727/ changed the behavior of `load` API. After the change, the `load` API does not add the value of `path` into the `options`. Thank you! This PR is to add the option `path` back to `load()` API in `DataFrameReader`, if and only if users specify one and only one `path` in the `load` API. For example, users can see the `path` option after the following API call, ```Scala spark.read .format("parquet") .load("/test") ``` How was this patch tested? Added test cases. Author: gatorsmileCloses #13933 from gatorsmile/optionPath. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25520e97 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25520e97 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25520e97 Branch: refs/heads/master Commit: 25520e976275e0d1e3bf9c73128ef4dec4618568 Parents: 35438fb Author: gatorsmile Authored: Tue Jun 28 15:32:45 2016 -0700 Committer: Reynold Xin Committed: Tue Jun 28 15:32:45 2016 -0700 -- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 28 2 files changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/25520e97/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 35ba9c5..35ba522 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -129,7 +129,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(path: String): DataFrame = { -load(Seq(path): _*) // force invocation of `load(...varargs...)` +option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)` } /** http://git-wip-us.apache.org/repos/asf/spark/blob/25520e97/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 3fa3864..ebbcc1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -170,6 +170,34 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be assert(LastOptions.saveMode === SaveMode.ErrorIfExists) } + test("test path option in load") { +spark.read + .format("org.apache.spark.sql.test") + .option("intOpt", 56) + .load("/test") + +assert(LastOptions.parameters("intOpt") == "56") +assert(LastOptions.parameters("path") == "/test") + +LastOptions.clear() +spark.read + .format("org.apache.spark.sql.test") + .option("intOpt", 55) + .load() + +assert(LastOptions.parameters("intOpt") == "55") +assert(!LastOptions.parameters.contains("path")) + +LastOptions.clear() +spark.read + .format("org.apache.spark.sql.test") + .option("intOpt", 54) + .load("/test", "/test1", "/test2") + +assert(LastOptions.parameters("intOpt") == "54") +assert(!LastOptions.parameters.contains("path")) + } + test("test different data types for options") { val df = spark.read .format("org.apache.spark.sql.test") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16175] [PYSPARK] handle None for UDT
Repository: spark Updated Branches: refs/heads/master 1aad8c6e5 -> 35438fb0a [SPARK-16175] [PYSPARK] handle None for UDT ## What changes were proposed in this pull request? Scala UDT will bypass all the null and will not pass them into serialize() and deserialize() of UDT, this PR update the Python UDT to do this as well. ## How was this patch tested? Added tests. Author: Davies LiuCloses #13878 from davies/udt_null. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35438fb0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35438fb0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35438fb0 Branch: refs/heads/master Commit: 35438fb0ad3bcda5c5a3a0ccde1a620699d012db Parents: 1aad8c6 Author: Davies Liu Authored: Tue Jun 28 14:09:38 2016 -0700 Committer: Davies Liu Committed: Tue Jun 28 14:09:38 2016 -0700 -- python/pyspark/sql/tests.py | 11 +++ python/pyspark/sql/types.py | 7 +-- 2 files changed, 16 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/35438fb0/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f863485..a8ca386 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -575,6 +575,17 @@ class SQLTests(ReusedPySparkTestCase): _verify_type(PythonOnlyPoint(1.0, 2.0), PythonOnlyUDT()) self.assertRaises(ValueError, lambda: _verify_type([1.0, 2.0], PythonOnlyUDT())) +def test_udt_with_none(self): +df = self.spark.range(0, 10, 1, 1) + +def myudf(x): +if x > 0: +return PythonOnlyPoint(float(x), float(x)) + +self.spark.catalog.registerFunction("udf", myudf, PythonOnlyUDT()) +rows = [r[0] for r in df.selectExpr("udf(id)").take(2)] +self.assertEqual(rows, [None, PythonOnlyPoint(1, 1)]) + def test_infer_schema_with_udt(self): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) http://git-wip-us.apache.org/repos/asf/spark/blob/35438fb0/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index f0b56be..a367987 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -648,10 +648,13 @@ class UserDefinedType(DataType): return cls._cached_sql_type def toInternal(self, obj): -return self._cachedSqlType().toInternal(self.serialize(obj)) +if obj is not None: +return self._cachedSqlType().toInternal(self.serialize(obj)) def fromInternal(self, obj): -return self.deserialize(self._cachedSqlType().fromInternal(obj)) +v = self._cachedSqlType().fromInternal(obj) +if v is not None: +return self.deserialize(v) def serialize(self, obj): """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16175] [PYSPARK] handle None for UDT
Repository: spark Updated Branches: refs/heads/branch-2.0 5c9555e11 -> 43bd612f3 [SPARK-16175] [PYSPARK] handle None for UDT ## What changes were proposed in this pull request? Scala UDT will bypass all the null and will not pass them into serialize() and deserialize() of UDT, this PR update the Python UDT to do this as well. ## How was this patch tested? Added tests. Author: Davies LiuCloses #13878 from davies/udt_null. (cherry picked from commit 35438fb0ad3bcda5c5a3a0ccde1a620699d012db) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43bd612f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43bd612f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43bd612f Branch: refs/heads/branch-2.0 Commit: 43bd612f35490c11a76d5379d723ba65f7afbefd Parents: 5c9555e Author: Davies Liu Authored: Tue Jun 28 14:09:38 2016 -0700 Committer: Davies Liu Committed: Tue Jun 28 14:09:58 2016 -0700 -- python/pyspark/sql/tests.py | 11 +++ python/pyspark/sql/types.py | 7 +-- 2 files changed, 16 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/43bd612f/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f863485..a8ca386 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -575,6 +575,17 @@ class SQLTests(ReusedPySparkTestCase): _verify_type(PythonOnlyPoint(1.0, 2.0), PythonOnlyUDT()) self.assertRaises(ValueError, lambda: _verify_type([1.0, 2.0], PythonOnlyUDT())) +def test_udt_with_none(self): +df = self.spark.range(0, 10, 1, 1) + +def myudf(x): +if x > 0: +return PythonOnlyPoint(float(x), float(x)) + +self.spark.catalog.registerFunction("udf", myudf, PythonOnlyUDT()) +rows = [r[0] for r in df.selectExpr("udf(id)").take(2)] +self.assertEqual(rows, [None, PythonOnlyPoint(1, 1)]) + def test_infer_schema_with_udt(self): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) http://git-wip-us.apache.org/repos/asf/spark/blob/43bd612f/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index f0b56be..a367987 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -648,10 +648,13 @@ class UserDefinedType(DataType): return cls._cached_sql_type def toInternal(self, obj): -return self._cachedSqlType().toInternal(self.serialize(obj)) +if obj is not None: +return self._cachedSqlType().toInternal(self.serialize(obj)) def fromInternal(self, obj): -return self.deserialize(self._cachedSqlType().fromInternal(obj)) +v = self._cachedSqlType().fromInternal(obj) +if v is not None: +return self.deserialize(v) def serialize(self, obj): """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16259][PYSPARK] cleanup options in DataFrame read/write API
Repository: spark Updated Branches: refs/heads/master ae14f3623 -> 1aad8c6e5 [SPARK-16259][PYSPARK] cleanup options in DataFrame read/write API ## What changes were proposed in this pull request? There are some duplicated code for options in DataFrame reader/writer API, this PR clean them up, it also fix a bug for `escapeQuotes` of csv(). ## How was this patch tested? Existing tests. Author: Davies LiuCloses #13948 from davies/csv_options. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1aad8c6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1aad8c6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1aad8c6e Branch: refs/heads/master Commit: 1aad8c6e59c1e8b18a3eaa8ded93ff6ad05d83df Parents: ae14f36 Author: Davies Liu Authored: Tue Jun 28 13:43:59 2016 -0700 Committer: Reynold Xin Committed: Tue Jun 28 13:43:59 2016 -0700 -- python/pyspark/sql/readwriter.py | 119 ++ 1 file changed, 20 insertions(+), 99 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1aad8c6e/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ccbf895..3f28d7a 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -44,84 +44,20 @@ def to_str(value): return str(value) -class ReaderUtils(object): +class OptionUtils(object): -def _set_json_opts(self, schema, primitivesAsString, prefersDecimal, - allowComments, allowUnquotedFieldNames, allowSingleQuotes, - allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, - mode, columnNameOfCorruptRecord): +def _set_opts(self, schema=None, **options): """ -Set options based on the Json optional parameters +Set named options (filter out those the value is None) """ if schema is not None: self.schema(schema) -if primitivesAsString is not None: -self.option("primitivesAsString", primitivesAsString) -if prefersDecimal is not None: -self.option("prefersDecimal", prefersDecimal) -if allowComments is not None: -self.option("allowComments", allowComments) -if allowUnquotedFieldNames is not None: -self.option("allowUnquotedFieldNames", allowUnquotedFieldNames) -if allowSingleQuotes is not None: -self.option("allowSingleQuotes", allowSingleQuotes) -if allowNumericLeadingZero is not None: -self.option("allowNumericLeadingZero", allowNumericLeadingZero) -if allowBackslashEscapingAnyCharacter is not None: -self.option("allowBackslashEscapingAnyCharacter", allowBackslashEscapingAnyCharacter) -if mode is not None: -self.option("mode", mode) -if columnNameOfCorruptRecord is not None: -self.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) - -def _set_csv_opts(self, schema, sep, encoding, quote, escape, - comment, header, inferSchema, ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, - dateFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode): -""" -Set options based on the CSV optional parameters -""" -if schema is not None: -self.schema(schema) -if sep is not None: -self.option("sep", sep) -if encoding is not None: -self.option("encoding", encoding) -if quote is not None: -self.option("quote", quote) -if escape is not None: -self.option("escape", escape) -if comment is not None: -self.option("comment", comment) -if header is not None: -self.option("header", header) -if inferSchema is not None: -self.option("inferSchema", inferSchema) -if ignoreLeadingWhiteSpace is not None: -self.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace) -if ignoreTrailingWhiteSpace is not None: -self.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace) -if nullValue is not None: -self.option("nullValue", nullValue) -if nanValue is not None: -self.option("nanValue", nanValue) -if positiveInf is not None: -self.option("positiveInf", positiveInf) -if negativeInf is not None: -self.option("negativeInf", negativeInf) -if
spark git commit: [SPARK-16148][SCHEDULER] Allow for underscores in TaskLocation in the Executor ID
Repository: spark Updated Branches: refs/heads/branch-2.0 c86d29b2e -> 5c9555e11 [SPARK-16148][SCHEDULER] Allow for underscores in TaskLocation in the Executor ID ## What changes were proposed in this pull request? Previously, the TaskLocation implementation would not allow for executor ids which include underscores. This tweaks the string split used to get the hostname and executor id, allowing for underscores in the executor id. This addresses the JIRA found here: https://issues.apache.org/jira/browse/SPARK-16148 This is moved over from a previous PR against branch-1.6: https://github.com/apache/spark/pull/13857 ## How was this patch tested? Ran existing unit tests for core and streaming. Manually ran a simple streaming job with an executor whose id contained underscores and confirmed that the job ran successfully. This is my original work and I license the work to the project under the project's open source license. Author: Tom MagrinoCloses #13858 from tmagrino/fixtasklocation. (cherry picked from commit ae14f362355b131fcb3e3633da7bb14bdd2b6893) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c9555e1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c9555e1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c9555e1 Branch: refs/heads/branch-2.0 Commit: 5c9555e1115ce52954db2a1b18f78cd77ec8c15f Parents: c86d29b Author: Tom Magrino Authored: Tue Jun 28 13:36:41 2016 -0700 Committer: Shixiong Zhu Committed: Tue Jun 28 13:38:27 2016 -0700 -- .../org/apache/spark/scheduler/TaskLocation.scala | 14 +++--- .../apache/spark/scheduler/TaskSetManagerSuite.scala | 2 ++ 2 files changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c9555e1/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 1eb6c16..06b5293 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -64,18 +64,18 @@ private[spark] object TaskLocation { /** * Create a TaskLocation from a string returned by getPreferredLocations. - * These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the - * location is cached. + * These strings have the form executor_[hostname]_[executorid], [hostname], or + * hdfs_cache_[hostname], depending on whether the location is cached. */ def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { if (str.startsWith(executorLocationTag)) { -val splits = str.split("_") -if (splits.length != 3) { - throw new IllegalArgumentException("Illegal executor location format: " + str) -} -new ExecutorCacheTaskLocation(splits(1), splits(2)) +val hostAndExecutorId = str.stripPrefix(executorLocationTag) +val splits = hostAndExecutorId.split("_", 2) +require(splits.length == 2, "Illegal executor location format: " + str) +val Array(host, executorId) = splits +new ExecutorCacheTaskLocation(host, executorId) } else { new HostTaskLocation(str) } http://git-wip-us.apache.org/repos/asf/spark/blob/5c9555e1/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 9b7b945..8623133 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -787,6 +787,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(TaskLocation("host1") === HostTaskLocation("host1")) assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1")) assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3")) +assert(TaskLocation("executor_some.host1_executor_task_3") === + ExecutorCacheTaskLocation("some.host1", "executor_task_3")) } private def createTaskResult( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
spark git commit: [SPARK-16148][SCHEDULER] Allow for underscores in TaskLocation in the Executor ID
Repository: spark Updated Branches: refs/heads/master d59ba8e30 -> ae14f3623 [SPARK-16148][SCHEDULER] Allow for underscores in TaskLocation in the Executor ID ## What changes were proposed in this pull request? Previously, the TaskLocation implementation would not allow for executor ids which include underscores. This tweaks the string split used to get the hostname and executor id, allowing for underscores in the executor id. This addresses the JIRA found here: https://issues.apache.org/jira/browse/SPARK-16148 This is moved over from a previous PR against branch-1.6: https://github.com/apache/spark/pull/13857 ## How was this patch tested? Ran existing unit tests for core and streaming. Manually ran a simple streaming job with an executor whose id contained underscores and confirmed that the job ran successfully. This is my original work and I license the work to the project under the project's open source license. Author: Tom MagrinoCloses #13858 from tmagrino/fixtasklocation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae14f362 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae14f362 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae14f362 Branch: refs/heads/master Commit: ae14f362355b131fcb3e3633da7bb14bdd2b6893 Parents: d59ba8e Author: Tom Magrino Authored: Tue Jun 28 13:36:41 2016 -0700 Committer: Shixiong Zhu Committed: Tue Jun 28 13:36:41 2016 -0700 -- .../org/apache/spark/scheduler/TaskLocation.scala | 14 +++--- .../apache/spark/scheduler/TaskSetManagerSuite.scala | 2 ++ 2 files changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ae14f362/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 1eb6c16..06b5293 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -64,18 +64,18 @@ private[spark] object TaskLocation { /** * Create a TaskLocation from a string returned by getPreferredLocations. - * These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the - * location is cached. + * These strings have the form executor_[hostname]_[executorid], [hostname], or + * hdfs_cache_[hostname], depending on whether the location is cached. */ def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { if (str.startsWith(executorLocationTag)) { -val splits = str.split("_") -if (splits.length != 3) { - throw new IllegalArgumentException("Illegal executor location format: " + str) -} -new ExecutorCacheTaskLocation(splits(1), splits(2)) +val hostAndExecutorId = str.stripPrefix(executorLocationTag) +val splits = hostAndExecutorId.split("_", 2) +require(splits.length == 2, "Illegal executor location format: " + str) +val Array(host, executorId) = splits +new ExecutorCacheTaskLocation(host, executorId) } else { new HostTaskLocation(str) } http://git-wip-us.apache.org/repos/asf/spark/blob/ae14f362/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index cfbabd8..36d1c56 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -802,6 +802,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(TaskLocation("host1") === HostTaskLocation("host1")) assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1")) assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3")) +assert(TaskLocation("executor_some.host1_executor_task_3") === + ExecutorCacheTaskLocation("some.host1", "executor_task_3")) } test("Kill other task attempts when one attempt belonging to the same task succeeds") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][SPARKR] update sparkR DataFrame.R comment
Repository: spark Updated Branches: refs/heads/branch-2.0 a1d04cc03 -> c86d29b2e [MINOR][SPARKR] update sparkR DataFrame.R comment ## What changes were proposed in this pull request? update sparkR DataFrame.R comment SQLContext ==> SparkSession ## How was this patch tested? N/A Author: WeichenXuCloses #13946 from WeichenXu123/sparkR_comment_update_sparkSession. (cherry picked from commit d59ba8e30751bbf91d49f5530b8242a12bbfb569) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c86d29b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c86d29b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c86d29b2 Branch: refs/heads/branch-2.0 Commit: c86d29b2e6bfda05124c20ba3c6db9275c24faa8 Parents: a1d04cc Author: WeichenXu Authored: Tue Jun 28 12:12:20 2016 -0700 Committer: Shivaram Venkataraman Committed: Tue Jun 28 12:12:28 2016 -0700 -- R/pkg/R/DataFrame.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c86d29b2/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index f856979..567758d 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -466,7 +466,7 @@ setMethod("createOrReplaceTempView", #' (Deprecated) Register Temporary Table #' -#' Registers a SparkDataFrame as a Temporary Table in the SQLContext +#' Registers a SparkDataFrame as a Temporary Table in the SparkSession #' @param x A SparkDataFrame #' @param tableName A character vector containing the name of the table #' @@ -493,7 +493,7 @@ setMethod("registerTempTable", #' insertInto #' -#' Insert the contents of a SparkDataFrame into a table registered in the current SQL Context. +#' Insert the contents of a SparkDataFrame into a table registered in the current SparkSession. #' #' @param x A SparkDataFrame #' @param tableName A character vector containing the name of the table - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][SPARKR] update sparkR DataFrame.R comment
Repository: spark Updated Branches: refs/heads/master 26252f706 -> d59ba8e30 [MINOR][SPARKR] update sparkR DataFrame.R comment ## What changes were proposed in this pull request? update sparkR DataFrame.R comment SQLContext ==> SparkSession ## How was this patch tested? N/A Author: WeichenXuCloses #13946 from WeichenXu123/sparkR_comment_update_sparkSession. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d59ba8e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d59ba8e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d59ba8e3 Branch: refs/heads/master Commit: d59ba8e30751bbf91d49f5530b8242a12bbfb569 Parents: 26252f7 Author: WeichenXu Authored: Tue Jun 28 12:12:20 2016 -0700 Committer: Shivaram Venkataraman Committed: Tue Jun 28 12:12:20 2016 -0700 -- R/pkg/R/DataFrame.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d59ba8e3/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 61d47a8..25327be 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -471,7 +471,7 @@ setMethod("createOrReplaceTempView", #' (Deprecated) Register Temporary Table #' -#' Registers a SparkDataFrame as a Temporary Table in the SQLContext +#' Registers a SparkDataFrame as a Temporary Table in the SparkSession #' @param x A SparkDataFrame #' @param tableName A character vector containing the name of the table #' @@ -498,7 +498,7 @@ setMethod("registerTempTable", #' insertInto #' -#' Insert the contents of a SparkDataFrame into a table registered in the current SQL Context. +#' Insert the contents of a SparkDataFrame into a table registered in the current SparkSession. #' #' @param x A SparkDataFrame #' @param tableName A character vector containing the name of the table - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15643][DOC][ML] Update spark.ml and spark.mllib migration guide from 1.6 to 2.0
Repository: spark Updated Branches: refs/heads/master 1f2776df6 -> 26252f706 [SPARK-15643][DOC][ML] Update spark.ml and spark.mllib migration guide from 1.6 to 2.0 ## What changes were proposed in this pull request? Update ```spark.ml``` and ```spark.mllib``` migration guide from 1.6 to 2.0. ## How was this patch tested? Docs update, no tests. Author: Yanbo LiangCloses #13378 from yanboliang/spark-13448. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26252f70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26252f70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26252f70 Branch: refs/heads/master Commit: 26252f7064ba852e1bce6d8233a95aeb395f826a Parents: 1f2776d Author: Yanbo Liang Authored: Tue Jun 28 11:54:25 2016 -0700 Committer: Joseph K. Bradley Committed: Tue Jun 28 11:54:25 2016 -0700 -- docs/mllib-guide.md| 60 + docs/mllib-migration-guides.md | 27 + 2 files changed, 68 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26252f70/docs/mllib-guide.md -- diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index fa5e906..c28d137 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -102,32 +102,54 @@ MLlib is under active development. The APIs marked `Experimental`/`DeveloperApi` may change in future releases, and the migration guide below will explain all changes between releases. -## From 1.5 to 1.6 +## From 1.6 to 2.0 -There are no breaking API changes in the `spark.mllib` or `spark.ml` packages, but there are -deprecations and changes of behavior. +The deprecations and changes of behavior in the `spark.mllib` or `spark.ml` packages include: Deprecations: -* [SPARK-11358](https://issues.apache.org/jira/browse/SPARK-11358): - In `spark.mllib.clustering.KMeans`, the `runs` parameter has been deprecated. -* [SPARK-10592](https://issues.apache.org/jira/browse/SPARK-10592): - In `spark.ml.classification.LogisticRegressionModel` and - `spark.ml.regression.LinearRegressionModel`, the `weights` field has been deprecated in favor of - the new name `coefficients`. This helps disambiguate from instance (row) "weights" given to - algorithms. +* [SPARK-14984](https://issues.apache.org/jira/browse/SPARK-14984): + In `spark.ml.regression.LinearRegressionSummary`, the `model` field has been deprecated. +* [SPARK-13784](https://issues.apache.org/jira/browse/SPARK-13784): + In `spark.ml.regression.RandomForestRegressionModel` and `spark.ml.classification.RandomForestClassificationModel`, + the `numTrees` parameter has been deprecated in favor of `getNumTrees` method. +* [SPARK-13761](https://issues.apache.org/jira/browse/SPARK-13761): + In `spark.ml.param.Params`, the `validateParams` method has been deprecated. + We move all functionality in overridden methods to the corresponding `transformSchema`. +* [SPARK-14829](https://issues.apache.org/jira/browse/SPARK-14829): + In `spark.mllib` package, `LinearRegressionWithSGD`, `LassoWithSGD`, `RidgeRegressionWithSGD` and `LogisticRegressionWithSGD` have been deprecated. + We encourage users to use `spark.ml.regression.LinearRegresson` and `spark.ml.classification.LogisticRegresson`. +* [SPARK-14900](https://issues.apache.org/jira/browse/SPARK-14900): + In `spark.mllib.evaluation.MulticlassMetrics`, the parameters `precision`, `recall` and `fMeasure` have been deprecated in favor of `accuracy`. +* [SPARK-15644](https://issues.apache.org/jira/browse/SPARK-15644): + In `spark.ml.util.MLReader` and `spark.ml.util.MLWriter`, the `context` method has been deprecated in favor of `session`. +* In `spark.ml.feature.ChiSqSelectorModel`, the `setLabelCol` method has been deprecated since it was not used by `ChiSqSelectorModel`. Changes of behavior: -* [SPARK-7770](https://issues.apache.org/jira/browse/SPARK-7770): - `spark.mllib.tree.GradientBoostedTrees`: `validationTol` has changed semantics in 1.6. - Previously, it was a threshold for absolute change in error. Now, it resembles the behavior of - `GradientDescent`'s `convergenceTol`: For large errors, it uses relative error (relative to the - previous error); for small errors (`< 0.01`), it uses absolute error. -* [SPARK-11069](https://issues.apache.org/jira/browse/SPARK-11069): - `spark.ml.feature.RegexTokenizer`: Previously, it did not convert strings to lowercase before - tokenizing. Now, it converts to lowercase by default, with an option not to. This matches the - behavior of the simpler `Tokenizer` transformer. +* [SPARK-7780](https://issues.apache.org/jira/browse/SPARK-7780): +
spark git commit: [SPARK-15643][DOC][ML] Update spark.ml and spark.mllib migration guide from 1.6 to 2.0
Repository: spark Updated Branches: refs/heads/branch-2.0 e68872f2e -> a1d04cc03 [SPARK-15643][DOC][ML] Update spark.ml and spark.mllib migration guide from 1.6 to 2.0 ## What changes were proposed in this pull request? Update ```spark.ml``` and ```spark.mllib``` migration guide from 1.6 to 2.0. ## How was this patch tested? Docs update, no tests. Author: Yanbo LiangCloses #13378 from yanboliang/spark-13448. (cherry picked from commit 26252f7064ba852e1bce6d8233a95aeb395f826a) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1d04cc0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1d04cc0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1d04cc0 Branch: refs/heads/branch-2.0 Commit: a1d04cc03edac729488e9eb2b9e34e6c951ddbae Parents: e68872f Author: Yanbo Liang Authored: Tue Jun 28 11:54:25 2016 -0700 Committer: Joseph K. Bradley Committed: Tue Jun 28 11:54:34 2016 -0700 -- docs/mllib-guide.md| 60 + docs/mllib-migration-guides.md | 27 + 2 files changed, 68 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a1d04cc0/docs/mllib-guide.md -- diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index fa5e906..c28d137 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -102,32 +102,54 @@ MLlib is under active development. The APIs marked `Experimental`/`DeveloperApi` may change in future releases, and the migration guide below will explain all changes between releases. -## From 1.5 to 1.6 +## From 1.6 to 2.0 -There are no breaking API changes in the `spark.mllib` or `spark.ml` packages, but there are -deprecations and changes of behavior. +The deprecations and changes of behavior in the `spark.mllib` or `spark.ml` packages include: Deprecations: -* [SPARK-11358](https://issues.apache.org/jira/browse/SPARK-11358): - In `spark.mllib.clustering.KMeans`, the `runs` parameter has been deprecated. -* [SPARK-10592](https://issues.apache.org/jira/browse/SPARK-10592): - In `spark.ml.classification.LogisticRegressionModel` and - `spark.ml.regression.LinearRegressionModel`, the `weights` field has been deprecated in favor of - the new name `coefficients`. This helps disambiguate from instance (row) "weights" given to - algorithms. +* [SPARK-14984](https://issues.apache.org/jira/browse/SPARK-14984): + In `spark.ml.regression.LinearRegressionSummary`, the `model` field has been deprecated. +* [SPARK-13784](https://issues.apache.org/jira/browse/SPARK-13784): + In `spark.ml.regression.RandomForestRegressionModel` and `spark.ml.classification.RandomForestClassificationModel`, + the `numTrees` parameter has been deprecated in favor of `getNumTrees` method. +* [SPARK-13761](https://issues.apache.org/jira/browse/SPARK-13761): + In `spark.ml.param.Params`, the `validateParams` method has been deprecated. + We move all functionality in overridden methods to the corresponding `transformSchema`. +* [SPARK-14829](https://issues.apache.org/jira/browse/SPARK-14829): + In `spark.mllib` package, `LinearRegressionWithSGD`, `LassoWithSGD`, `RidgeRegressionWithSGD` and `LogisticRegressionWithSGD` have been deprecated. + We encourage users to use `spark.ml.regression.LinearRegresson` and `spark.ml.classification.LogisticRegresson`. +* [SPARK-14900](https://issues.apache.org/jira/browse/SPARK-14900): + In `spark.mllib.evaluation.MulticlassMetrics`, the parameters `precision`, `recall` and `fMeasure` have been deprecated in favor of `accuracy`. +* [SPARK-15644](https://issues.apache.org/jira/browse/SPARK-15644): + In `spark.ml.util.MLReader` and `spark.ml.util.MLWriter`, the `context` method has been deprecated in favor of `session`. +* In `spark.ml.feature.ChiSqSelectorModel`, the `setLabelCol` method has been deprecated since it was not used by `ChiSqSelectorModel`. Changes of behavior: -* [SPARK-7770](https://issues.apache.org/jira/browse/SPARK-7770): - `spark.mllib.tree.GradientBoostedTrees`: `validationTol` has changed semantics in 1.6. - Previously, it was a threshold for absolute change in error. Now, it resembles the behavior of - `GradientDescent`'s `convergenceTol`: For large errors, it uses relative error (relative to the - previous error); for small errors (`< 0.01`), it uses absolute error. -* [SPARK-11069](https://issues.apache.org/jira/browse/SPARK-11069): - `spark.ml.feature.RegexTokenizer`: Previously, it did not convert strings to lowercase before - tokenizing. Now, it converts to lowercase by default, with an option not to. This matches the - behavior of the
spark git commit: [SPARK-16181][SQL] outer join with isNull filter may return wrong result
Repository: spark Updated Branches: refs/heads/branch-2.0 4c5e16f58 -> e68872f2e [SPARK-16181][SQL] outer join with isNull filter may return wrong result ## What changes were proposed in this pull request? The root cause is: the output attributes of outer join are derived from its children, while they are actually different attributes(outer join can return null). We have already added some special logic to handle it, e.g. `PushPredicateThroughJoin` won't push down predicates through outer join side, `FixNullability`. This PR adds one more special logic in `FoldablePropagation`. ## How was this patch tested? new test in `DataFrameSuite` Author: Wenchen FanCloses #13884 from cloud-fan/bug. (cherry picked from commit 1f2776df6e87a84991537ac20e4b8829472d3462) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e68872f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e68872f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e68872f2 Branch: refs/heads/branch-2.0 Commit: e68872f2ef89e85ab7a856bee82d16c938de6db0 Parents: 4c5e16f Author: Wenchen Fan Authored: Tue Jun 28 10:26:01 2016 -0700 Committer: Yin Huai Committed: Tue Jun 28 10:28:57 2016 -0700 -- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 8 .../test/scala/org/apache/spark/sql/DataFrameSuite.scala| 9 + 2 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e68872f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6b10484..f24f8b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -687,6 +687,14 @@ object FoldablePropagation extends Rule[LogicalPlan] { case c: Command => stop = true c +// For outer join, although its output attributes are derived from its children, they are +// actually different attributes: the output of outer join is not always picked from its +// children, but can also be null. +// TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes +// of outer join. +case j @ Join(_, _, LeftOuter | RightOuter | FullOuter, _) => + stop = true + j case p: LogicalPlan if !stop => p.transformExpressions { case a: AttributeReference if foldableMap.contains(a) => foldableMap(a) http://git-wip-us.apache.org/repos/asf/spark/blob/e68872f2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 1afee9f..5151532 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1541,4 +1541,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = Seq(1, 1, 2).toDF("column.with.dot") checkAnswer(df.distinct(), Row(1) :: Row(2) :: Nil) } + + test("SPARK-16181: outer join with isNull filter") { +val left = Seq("x").toDF("col") +val right = Seq("y").toDF("col").withColumn("new", lit(true)) +val joined = left.join(right, left("col") === right("col"), "left_outer") + +checkAnswer(joined, Row("x", null, null)) +checkAnswer(joined.filter($"new".isNull), Row("x", null, null)) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16181][SQL] outer join with isNull filter may return wrong result
Repository: spark Updated Branches: refs/heads/master 0923c4f56 -> 1f2776df6 [SPARK-16181][SQL] outer join with isNull filter may return wrong result ## What changes were proposed in this pull request? The root cause is: the output attributes of outer join are derived from its children, while they are actually different attributes(outer join can return null). We have already added some special logic to handle it, e.g. `PushPredicateThroughJoin` won't push down predicates through outer join side, `FixNullability`. This PR adds one more special logic in `FoldablePropagation`. ## How was this patch tested? new test in `DataFrameSuite` Author: Wenchen FanCloses #13884 from cloud-fan/bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f2776df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f2776df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f2776df Branch: refs/heads/master Commit: 1f2776df6e87a84991537ac20e4b8829472d3462 Parents: 0923c4f Author: Wenchen Fan Authored: Tue Jun 28 10:26:01 2016 -0700 Committer: Yin Huai Committed: Tue Jun 28 10:26:01 2016 -0700 -- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 8 .../test/scala/org/apache/spark/sql/DataFrameSuite.scala| 9 + 2 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f2776df/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2bca31d..9bc8cea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -688,6 +688,14 @@ object FoldablePropagation extends Rule[LogicalPlan] { case c: Command => stop = true c +// For outer join, although its output attributes are derived from its children, they are +// actually different attributes: the output of outer join is not always picked from its +// children, but can also be null. +// TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes +// of outer join. +case j @ Join(_, _, LeftOuter | RightOuter | FullOuter, _) => + stop = true + j case p: LogicalPlan if !stop => p.transformExpressions { case a: AttributeReference if foldableMap.contains(a) => foldableMap(a) http://git-wip-us.apache.org/repos/asf/spark/blob/1f2776df/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 6a0a7df..9d53be8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1562,4 +1562,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = Seq(1, 1, 2).toDF("column.with.dot") checkAnswer(df.distinct(), Row(1) :: Row(2) :: Nil) } + + test("SPARK-16181: outer join with isNull filter") { +val left = Seq("x").toDF("col") +val right = Seq("y").toDF("col").withColumn("new", lit(true)) +val joined = left.join(right, left("col") === right("col"), "left_outer") + +checkAnswer(joined, Row("x", null, null)) +checkAnswer(joined.filter($"new".isNull), Row("x", null, null)) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf
Repository: spark Updated Branches: refs/heads/branch-2.0 b349237e4 -> 4c5e16f58 [SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf ## What changes were proposed in this pull request? When we create a SparkSession at the Python side, it is possible that a SparkContext has been created. For this case, we need to set configs of the SparkSession builder to the Scala SparkContext's SparkConf (we need to do so because conf changes on a active Python SparkContext will not be propagated to the JVM side). Otherwise, we may create a wrong SparkSession (e.g. Hive support is not enabled even if enableHiveSupport is called). ## How was this patch tested? New tests and manual tests. Author: Yin HuaiCloses #13931 from yhuai/SPARK-16224. (cherry picked from commit 0923c4f5676691e28e70ecb05890e123540b91f0) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c5e16f5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c5e16f5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c5e16f5 Branch: refs/heads/branch-2.0 Commit: 4c5e16f58043b3103bbd59c5fa8fec4c411e5e11 Parents: b349237 Author: Yin Huai Authored: Tue Jun 28 07:54:44 2016 -0700 Committer: Davies Liu Committed: Tue Jun 28 07:54:58 2016 -0700 -- python/pyspark/context.py | 2 ++ python/pyspark/sql/session.py | 7 +++ python/pyspark/sql/tests.py | 43 +- python/pyspark/tests.py | 8 +++ 4 files changed, 59 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 7217a99..6e9f24e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -166,6 +166,8 @@ class SparkContext(object): # Create the Java SparkContext through Py4J self._jsc = jsc or self._initialize_context(self._conf._jconf) +# Reset the SparkConf to the one actually used by the SparkContext in JVM. +self._conf = SparkConf(_jconf=self._jsc.sc().conf()) # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 0c8024e..b4152a3 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -165,6 +165,13 @@ class SparkSession(object): for key, value in self._options.items(): sparkConf.set(key, value) sc = SparkContext.getOrCreate(sparkConf) +# This SparkContext may be an existing one. +for key, value in self._options.items(): +# we need to propagate the confs +# before we create the SparkSession. Otherwise, confs like +# warehouse path and metastore url will not be set correctly ( +# these confs cannot be changed once the SparkSession is created). +sc._conf.set(key, value) session = SparkSession(sc) for key, value in self._options.items(): session.conf.set(key, value) http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3f56411..f863485 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -22,6 +22,7 @@ individual modules. """ import os import sys +import subprocess import pydoc import shutil import tempfile @@ -48,7 +49,7 @@ else: from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type -from pyspark.tests import ReusedPySparkTestCase +from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2 from pyspark.sql.window import Window from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException @@ -1619,6 +1620,46 @@ class SQLTests(ReusedPySparkTestCase): lambda: spark.catalog.uncacheTable("does_not_exist")) +class
spark git commit: [SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf
Repository: spark Updated Branches: refs/heads/master e158478a9 -> 0923c4f56 [SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf ## What changes were proposed in this pull request? When we create a SparkSession at the Python side, it is possible that a SparkContext has been created. For this case, we need to set configs of the SparkSession builder to the Scala SparkContext's SparkConf (we need to do so because conf changes on a active Python SparkContext will not be propagated to the JVM side). Otherwise, we may create a wrong SparkSession (e.g. Hive support is not enabled even if enableHiveSupport is called). ## How was this patch tested? New tests and manual tests. Author: Yin HuaiCloses #13931 from yhuai/SPARK-16224. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0923c4f5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0923c4f5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0923c4f5 Branch: refs/heads/master Commit: 0923c4f5676691e28e70ecb05890e123540b91f0 Parents: e158478 Author: Yin Huai Authored: Tue Jun 28 07:54:44 2016 -0700 Committer: Davies Liu Committed: Tue Jun 28 07:54:44 2016 -0700 -- python/pyspark/context.py | 2 ++ python/pyspark/sql/session.py | 7 +++ python/pyspark/sql/tests.py | 43 +- python/pyspark/tests.py | 8 +++ 4 files changed, 59 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0923c4f5/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 7217a99..6e9f24e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -166,6 +166,8 @@ class SparkContext(object): # Create the Java SparkContext through Py4J self._jsc = jsc or self._initialize_context(self._conf._jconf) +# Reset the SparkConf to the one actually used by the SparkContext in JVM. +self._conf = SparkConf(_jconf=self._jsc.sc().conf()) # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server http://git-wip-us.apache.org/repos/asf/spark/blob/0923c4f5/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 0c8024e..b4152a3 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -165,6 +165,13 @@ class SparkSession(object): for key, value in self._options.items(): sparkConf.set(key, value) sc = SparkContext.getOrCreate(sparkConf) +# This SparkContext may be an existing one. +for key, value in self._options.items(): +# we need to propagate the confs +# before we create the SparkSession. Otherwise, confs like +# warehouse path and metastore url will not be set correctly ( +# these confs cannot be changed once the SparkSession is created). +sc._conf.set(key, value) session = SparkSession(sc) for key, value in self._options.items(): session.conf.set(key, value) http://git-wip-us.apache.org/repos/asf/spark/blob/0923c4f5/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3f56411..f863485 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -22,6 +22,7 @@ individual modules. """ import os import sys +import subprocess import pydoc import shutil import tempfile @@ -48,7 +49,7 @@ else: from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type -from pyspark.tests import ReusedPySparkTestCase +from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2 from pyspark.sql.window import Window from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException @@ -1619,6 +1620,46 @@ class SQLTests(ReusedPySparkTestCase): lambda: spark.catalog.uncacheTable("does_not_exist")) +class HiveSparkSubmitTests(SparkSubmitTests): + +def test_hivecontext(self): +# This test checks that HiveContext is using Hive
spark git commit: [SPARK-16242][MLLIB][PYSPARK] Conversion between old/new matrix columns in a DataFrame (Python)
Repository: spark Updated Branches: refs/heads/branch-2.0 af70ad028 -> b349237e4 [SPARK-16242][MLLIB][PYSPARK] Conversion between old/new matrix columns in a DataFrame (Python) ## What changes were proposed in this pull request? This PR implements python wrappers for #13888 to convert old/new matrix columns in a DataFrame. ## How was this patch tested? Doctest in python. Author: Yanbo LiangCloses #13935 from yanboliang/spark-16242. (cherry picked from commit e158478a9fff5e63ae0336a54b3f360d0cd38921) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b349237e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b349237e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b349237e Branch: refs/heads/branch-2.0 Commit: b349237e4b4cf60fccf9bfcf76deca78f1224bf1 Parents: af70ad0 Author: Yanbo Liang Authored: Tue Jun 28 06:28:22 2016 -0700 Committer: Yanbo Liang Committed: Tue Jun 28 06:28:58 2016 -0700 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 14 python/pyspark/mllib/util.py| 80 2 files changed, 94 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b349237e/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f2c70ba..f4819f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1215,6 +1215,20 @@ private[python] class PythonMLLibAPI extends Serializable { def convertVectorColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { MLUtils.convertVectorColumnsFromML(dataset, cols.asScala: _*) } + + /** + * Python-friendly version of [[MLUtils.convertMatrixColumnsToML()]]. + */ + def convertMatrixColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { +MLUtils.convertMatrixColumnsToML(dataset, cols.asScala: _*) + } + + /** + * Python-friendly version of [[MLUtils.convertMatrixColumnsFromML()]] + */ + def convertMatrixColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { +MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala: _*) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/b349237e/python/pyspark/mllib/util.py -- diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index a7e6bcc..48867a0 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -281,6 +281,86 @@ class MLUtils(object): raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset))) return callMLlibFunc("convertVectorColumnsFromML", dataset, list(cols)) +@staticmethod +@since("2.0.0") +def convertMatrixColumnsToML(dataset, *cols): +""" +Converts matrix columns in an input DataFrame from the +:py:class:`pyspark.mllib.linalg.Matrix` type to the new +:py:class:`pyspark.ml.linalg.Matrix` type under the `spark.ml` +package. + +:param dataset: + input dataset +:param cols: + a list of matrix columns to be converted. + New matrix columns will be ignored. If unspecified, all old + matrix columns will be converted excepted nested ones. +:return: + the input dataset with old matrix columns converted to the + new matrix type + +>>> import pyspark +>>> from pyspark.mllib.linalg import Matrices +>>> from pyspark.mllib.util import MLUtils +>>> df = spark.createDataFrame( +... [(0, Matrices.sparse(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4]), +... Matrices.dense(2, 2, range(4)))], ["id", "x", "y"]) +>>> r1 = MLUtils.convertMatrixColumnsToML(df).first() +>>> isinstance(r1.x, pyspark.ml.linalg.SparseMatrix) +True +>>> isinstance(r1.y, pyspark.ml.linalg.DenseMatrix) +True +>>> r2 = MLUtils.convertMatrixColumnsToML(df, "x").first() +>>> isinstance(r2.x, pyspark.ml.linalg.SparseMatrix) +True +>>> isinstance(r2.y, pyspark.mllib.linalg.DenseMatrix) +True +""" +if not isinstance(dataset, DataFrame): +raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset))) +return
spark git commit: [SPARK-16242][MLLIB][PYSPARK] Conversion between old/new matrix columns in a DataFrame (Python)
Repository: spark Updated Branches: refs/heads/master f6b497fcd -> e158478a9 [SPARK-16242][MLLIB][PYSPARK] Conversion between old/new matrix columns in a DataFrame (Python) ## What changes were proposed in this pull request? This PR implements python wrappers for #13888 to convert old/new matrix columns in a DataFrame. ## How was this patch tested? Doctest in python. Author: Yanbo LiangCloses #13935 from yanboliang/spark-16242. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e158478a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e158478a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e158478a Branch: refs/heads/master Commit: e158478a9fff5e63ae0336a54b3f360d0cd38921 Parents: f6b497f Author: Yanbo Liang Authored: Tue Jun 28 06:28:22 2016 -0700 Committer: Yanbo Liang Committed: Tue Jun 28 06:28:22 2016 -0700 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 14 python/pyspark/mllib/util.py| 80 2 files changed, 94 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e158478a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f2c70ba..f4819f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1215,6 +1215,20 @@ private[python] class PythonMLLibAPI extends Serializable { def convertVectorColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { MLUtils.convertVectorColumnsFromML(dataset, cols.asScala: _*) } + + /** + * Python-friendly version of [[MLUtils.convertMatrixColumnsToML()]]. + */ + def convertMatrixColumnsToML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { +MLUtils.convertMatrixColumnsToML(dataset, cols.asScala: _*) + } + + /** + * Python-friendly version of [[MLUtils.convertMatrixColumnsFromML()]] + */ + def convertMatrixColumnsFromML(dataset: DataFrame, cols: JArrayList[String]): DataFrame = { +MLUtils.convertMatrixColumnsFromML(dataset, cols.asScala: _*) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/e158478a/python/pyspark/mllib/util.py -- diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index a7e6bcc..48867a0 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -281,6 +281,86 @@ class MLUtils(object): raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset))) return callMLlibFunc("convertVectorColumnsFromML", dataset, list(cols)) +@staticmethod +@since("2.0.0") +def convertMatrixColumnsToML(dataset, *cols): +""" +Converts matrix columns in an input DataFrame from the +:py:class:`pyspark.mllib.linalg.Matrix` type to the new +:py:class:`pyspark.ml.linalg.Matrix` type under the `spark.ml` +package. + +:param dataset: + input dataset +:param cols: + a list of matrix columns to be converted. + New matrix columns will be ignored. If unspecified, all old + matrix columns will be converted excepted nested ones. +:return: + the input dataset with old matrix columns converted to the + new matrix type + +>>> import pyspark +>>> from pyspark.mllib.linalg import Matrices +>>> from pyspark.mllib.util import MLUtils +>>> df = spark.createDataFrame( +... [(0, Matrices.sparse(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4]), +... Matrices.dense(2, 2, range(4)))], ["id", "x", "y"]) +>>> r1 = MLUtils.convertMatrixColumnsToML(df).first() +>>> isinstance(r1.x, pyspark.ml.linalg.SparseMatrix) +True +>>> isinstance(r1.y, pyspark.ml.linalg.DenseMatrix) +True +>>> r2 = MLUtils.convertMatrixColumnsToML(df, "x").first() +>>> isinstance(r2.x, pyspark.ml.linalg.SparseMatrix) +True +>>> isinstance(r2.y, pyspark.mllib.linalg.DenseMatrix) +True +""" +if not isinstance(dataset, DataFrame): +raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset))) +return callMLlibFunc("convertMatrixColumnsToML", dataset, list(cols)) + +@staticmethod +@since("2.0.0") +def convertMatrixColumnsFromML(dataset,
spark git commit: [SPARK-16128][SQL] Allow setting length of characters to be truncated to, in Dataset.show function.
Repository: spark Updated Branches: refs/heads/master 4cbf611c1 -> f6b497fcd [SPARK-16128][SQL] Allow setting length of characters to be truncated to, in Dataset.show function. ## What changes were proposed in this pull request? Allowing truncate to a specific number of character is convenient at times, especially while operating from the REPL. Sometimes those last few characters make all the difference, and showing everything brings in whole lot of noise. ## How was this patch tested? Existing tests. + 1 new test in DataFrameSuite. For SparkR and pyspark, existing tests and manual testing. Author: Prashant SharmaAuthor: Prashant Sharma Closes #13839 from ScrapCodes/add_truncateTo_DF.show. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6b497fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6b497fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6b497fc Branch: refs/heads/master Commit: f6b497fcdddc705a9e1022e20b0dbc15da1b5a5a Parents: 4cbf611 Author: Prashant Sharma Authored: Tue Jun 28 17:11:06 2016 +0530 Committer: Prashant Sharma Committed: Tue Jun 28 17:11:06 2016 +0530 -- .gitignore | 1 + R/pkg/R/DataFrame.R | 11 +++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 python/pyspark/sql/dataframe.py | 18 ++-- .../scala/org/apache/spark/sql/Dataset.scala| 47 +--- .../org/apache/spark/sql/DataFrameSuite.scala | 27 +-- .../org/apache/spark/sql/DatasetSuite.scala | 2 +- 7 files changed, 97 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6b497fc/.gitignore -- diff --git a/.gitignore b/.gitignore index 9f8cd0b..b4dd1d0 100644 --- a/.gitignore +++ b/.gitignore @@ -77,3 +77,4 @@ spark-warehouse/ # For R session data .RData .RHistory +.Rhistory http://git-wip-us.apache.org/repos/asf/spark/blob/f6b497fc/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index f856979..61d47a8 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -176,8 +176,8 @@ setMethod("isLocal", #' @param x A SparkDataFrame #' @param numRows The number of rows to print. Defaults to 20. #' @param truncate Whether truncate long strings. If true, strings more than 20 characters will be -#' truncated and all cells will be aligned right -#' +#'truncated. However, if set greater than zero, truncates strings longer than `truncate` +#'characters and all cells will be aligned right. #' @family SparkDataFrame functions #' @rdname showDF #' @name showDF @@ -193,7 +193,12 @@ setMethod("isLocal", setMethod("showDF", signature(x = "SparkDataFrame"), function(x, numRows = 20, truncate = TRUE) { -s <- callJMethod(x@sdf, "showString", numToInt(numRows), truncate) +if (is.logical(truncate) && truncate) { + s <- callJMethod(x@sdf, "showString", numToInt(numRows), numToInt(20)) +} else { + truncate2 <- as.numeric(truncate) + s <- callJMethod(x@sdf, "showString", numToInt(numRows), numToInt(truncate2)) +} cat(s) }) http://git-wip-us.apache.org/repos/asf/spark/blob/f6b497fc/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 74def5c..7562fa9 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1582,7 +1582,15 @@ test_that("showDF()", { "| 30| Andy|\n", "| 19| Justin|\n", "++---+\n", sep = "") + expected2 <- paste("+---++\n", + "|age|name|\n", + "+---++\n", + "|nul| Mic|\n", + "| 30| And|\n", + "| 19| Jus|\n", + "+---++\n", sep = "") expect_output(showDF(df), expected) + expect_output(showDF(df, truncate = 3), expected2) }) test_that("isLocal()", { http://git-wip-us.apache.org/repos/asf/spark/blob/f6b497fc/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index acf9d08..a2443ed 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -271,7 +271,9 @@
spark git commit: [SPARK-16202][SQL][DOC] Correct The Description of CreatableRelationProvider's createRelation
Repository: spark Updated Branches: refs/heads/master dd6b7dbe7 -> 4cbf611c1 [SPARK-16202][SQL][DOC] Correct The Description of CreatableRelationProvider's createRelation What changes were proposed in this pull request? The API description of `createRelation` in `CreatableRelationProvider` is misleading. The current description only expects users to return the relation. ```Scala trait CreatableRelationProvider { def createRelation( sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation } ``` However, the major goal of this API should also include saving the `DataFrame`. Since this API is critical for Data Source API developers, this PR is to correct the description. How was this patch tested? N/A Author: gatorsmileCloses #13903 from gatorsmile/readUnderscoreFiles. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cbf611c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cbf611c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cbf611c Branch: refs/heads/master Commit: 4cbf611c1dc88111ff49d005e902ad5864799ede Parents: dd6b7db Author: gatorsmile Authored: Mon Jun 27 23:12:17 2016 -0700 Committer: Reynold Xin Committed: Mon Jun 27 23:12:17 2016 -0700 -- .../main/scala/org/apache/spark/sql/sources/interfaces.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4cbf611c/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index d2077a0..a16d7ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -148,9 +148,9 @@ trait StreamSinkProvider { @DeveloperApi trait CreatableRelationProvider { /** - * Creates a relation with the given parameters based on the contents of the given - * DataFrame. The mode specifies the expected behavior of createRelation when - * data already exists. + * Save the DataFrame to the destination and return a relation with the given parameters based on + * the contents of the given DataFrame. The mode specifies the expected behavior of createRelation + * when data already exists. * Right now, there are three modes, Append, Overwrite, and ErrorIfExists. * Append mode means that when saving a DataFrame to a data source, if data already exists, * contents of the DataFrame are expected to be appended to existing data. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org