spark git commit: [SPARK-15031][SPARK-15134][EXAMPLE][DOC] Use SparkSession and update indent in examples
Repository: spark Updated Branches: refs/heads/master ba5487c06 -> 9e266d07a [SPARK-15031][SPARK-15134][EXAMPLE][DOC] Use SparkSession and update indent in examples ## What changes were proposed in this pull request? 1, Use `SparkSession` according to [SPARK-15031](https://issues.apache.org/jira/browse/SPARK-15031) 2, Update indent for `SparkContext` according to [SPARK-15134](https://issues.apache.org/jira/browse/SPARK-15134) 3, BTW, remove some duplicate space and add missing '.' ## How was this patch tested? manual tests Author: Zheng RuiFengCloses #13050 from zhengruifeng/use_sparksession. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e266d07 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e266d07 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e266d07 Branch: refs/heads/master Commit: 9e266d07a444fd465fe178cdd5c4894cd09cbda3 Parents: ba5487c Author: Zheng RuiFeng Authored: Wed May 11 22:45:30 2016 -0700 Committer: Andrew Or Committed: Wed May 11 22:45:30 2016 -0700 -- .../JavaDecisionTreeClassificationExample.java | 14 --- .../ml/JavaDecisionTreeRegressionExample.java | 12 +++--- .../examples/ml/JavaDeveloperApiExample.java| 6 +-- .../JavaEstimatorTransformerParamExample.java | 4 +- ...avaGradientBoostedTreeClassifierExample.java | 6 +-- ...JavaGradientBoostedTreeRegressorExample.java | 12 +++--- ...vaLinearRegressionWithElasticNetExample.java | 12 +++--- .../JavaLogisticRegressionSummaryExample.java | 4 +- ...LogisticRegressionWithElasticNetExample.java | 4 +- ...ModelSelectionViaCrossValidationExample.java | 4 +- ...SelectionViaTrainValidationSplitExample.java | 4 +- ...vaMultilayerPerceptronClassifierExample.java | 4 +- .../ml/JavaQuantileDiscretizerExample.java | 4 +- .../ml/JavaRandomForestClassifierExample.java | 4 +- .../ml/JavaRandomForestRegressorExample.java| 6 ++- .../examples/ml/JavaSimpleParamsExample.java| 8 ++-- .../JavaSimpleTextClassificationPipeline.java | 4 +- .../ml/DecisionTreeClassificationExample.scala | 10 ++--- .../spark/examples/ml/DecisionTreeExample.scala | 39 ++-- .../ml/DecisionTreeRegressionExample.scala | 8 ++-- .../spark/examples/ml/DeveloperApiExample.scala | 14 +++ .../ml/EstimatorTransformerParamExample.scala | 8 ++-- .../apache/spark/examples/ml/GBTExample.scala | 30 --- .../GradientBoostedTreeClassifierExample.scala | 8 ++-- .../GradientBoostedTreeRegressorExample.scala | 8 ++-- .../examples/ml/LinearRegressionExample.scala | 17 + .../examples/ml/LogisticRegressionExample.scala | 21 ++- ...ogisticRegressionWithElasticNetExample.scala | 4 +- ...odelSelectionViaCrossValidationExample.scala | 4 +- ...electionViaTrainValidationSplitExample.scala | 4 +- .../ml/RandomForestClassifierExample.scala | 8 ++-- .../spark/examples/ml/RandomForestExample.scala | 32 .../ml/RandomForestRegressorExample.scala | 8 ++-- .../spark/examples/ml/SimpleParamsExample.scala | 8 ++-- 34 files changed, 192 insertions(+), 151 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9e266d07/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java index 733bc41..bdb76f0 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java @@ -32,7 +32,9 @@ import org.apache.spark.sql.SparkSession; public class JavaDecisionTreeClassificationExample { public static void main(String[] args) { SparkSession spark = SparkSession - .builder().appName("JavaDecisionTreeClassificationExample").getOrCreate(); + .builder() + .appName("JavaDecisionTreeClassificationExample") + .getOrCreate(); // $example on$ // Load the data stored in LIBSVM format as a DataFrame. @@ -52,10 +54,10 @@ public class JavaDecisionTreeClassificationExample { VectorIndexerModel featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") - .setMaxCategories(4) // features with > 4 distinct values are treated as continuous + .setMaxCategories(4) // features with > 4 distinct values are treated as continuous. .fit(data); -// Split the data into training and test sets
spark git commit: [SPARK-15031][SPARK-15134][EXAMPLE][DOC] Use SparkSession and update indent in examples
Repository: spark Updated Branches: refs/heads/branch-2.0 7d187539e -> 86acb5efd [SPARK-15031][SPARK-15134][EXAMPLE][DOC] Use SparkSession and update indent in examples ## What changes were proposed in this pull request? 1, Use `SparkSession` according to [SPARK-15031](https://issues.apache.org/jira/browse/SPARK-15031) 2, Update indent for `SparkContext` according to [SPARK-15134](https://issues.apache.org/jira/browse/SPARK-15134) 3, BTW, remove some duplicate space and add missing '.' ## How was this patch tested? manual tests Author: Zheng RuiFengCloses #13050 from zhengruifeng/use_sparksession. (cherry picked from commit 9e266d07a444fd465fe178cdd5c4894cd09cbda3) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86acb5ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86acb5ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86acb5ef Branch: refs/heads/branch-2.0 Commit: 86acb5efdbc52820f89c039edac61c0454709f4c Parents: 7d18753 Author: Zheng RuiFeng Authored: Wed May 11 22:45:30 2016 -0700 Committer: Andrew Or Committed: Wed May 11 22:45:41 2016 -0700 -- .../JavaDecisionTreeClassificationExample.java | 14 --- .../ml/JavaDecisionTreeRegressionExample.java | 12 +++--- .../examples/ml/JavaDeveloperApiExample.java| 6 +-- .../JavaEstimatorTransformerParamExample.java | 4 +- ...avaGradientBoostedTreeClassifierExample.java | 6 +-- ...JavaGradientBoostedTreeRegressorExample.java | 12 +++--- ...vaLinearRegressionWithElasticNetExample.java | 12 +++--- .../JavaLogisticRegressionSummaryExample.java | 4 +- ...LogisticRegressionWithElasticNetExample.java | 4 +- ...ModelSelectionViaCrossValidationExample.java | 4 +- ...SelectionViaTrainValidationSplitExample.java | 4 +- ...vaMultilayerPerceptronClassifierExample.java | 4 +- .../ml/JavaQuantileDiscretizerExample.java | 4 +- .../ml/JavaRandomForestClassifierExample.java | 4 +- .../ml/JavaRandomForestRegressorExample.java| 6 ++- .../examples/ml/JavaSimpleParamsExample.java| 8 ++-- .../JavaSimpleTextClassificationPipeline.java | 4 +- .../ml/DecisionTreeClassificationExample.scala | 10 ++--- .../spark/examples/ml/DecisionTreeExample.scala | 39 ++-- .../ml/DecisionTreeRegressionExample.scala | 8 ++-- .../spark/examples/ml/DeveloperApiExample.scala | 14 +++ .../ml/EstimatorTransformerParamExample.scala | 8 ++-- .../apache/spark/examples/ml/GBTExample.scala | 30 --- .../GradientBoostedTreeClassifierExample.scala | 8 ++-- .../GradientBoostedTreeRegressorExample.scala | 8 ++-- .../examples/ml/LinearRegressionExample.scala | 17 + .../examples/ml/LogisticRegressionExample.scala | 21 ++- ...ogisticRegressionWithElasticNetExample.scala | 4 +- ...odelSelectionViaCrossValidationExample.scala | 4 +- ...electionViaTrainValidationSplitExample.scala | 4 +- .../ml/RandomForestClassifierExample.scala | 8 ++-- .../spark/examples/ml/RandomForestExample.scala | 32 .../ml/RandomForestRegressorExample.scala | 8 ++-- .../spark/examples/ml/SimpleParamsExample.scala | 8 ++-- 34 files changed, 192 insertions(+), 151 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/86acb5ef/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java index 733bc41..bdb76f0 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java @@ -32,7 +32,9 @@ import org.apache.spark.sql.SparkSession; public class JavaDecisionTreeClassificationExample { public static void main(String[] args) { SparkSession spark = SparkSession - .builder().appName("JavaDecisionTreeClassificationExample").getOrCreate(); + .builder() + .appName("JavaDecisionTreeClassificationExample") + .getOrCreate(); // $example on$ // Load the data stored in LIBSVM format as a DataFrame. @@ -52,10 +54,10 @@ public class JavaDecisionTreeClassificationExample { VectorIndexerModel featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") - .setMaxCategories(4) // features with > 4 distinct values are treated as continuous + .setMaxCategories(4) //
spark git commit: [SPARK-15072][SQL][PYSPARK][HOT-FIX] Remove SparkSession.withHiveSupport from readwrite.py
Repository: spark Updated Branches: refs/heads/branch-2.0 0b14b3f13 -> 7d187539e [SPARK-15072][SQL][PYSPARK][HOT-FIX] Remove SparkSession.withHiveSupport from readwrite.py ## What changes were proposed in this pull request? Seems https://github.com/apache/spark/commit/db573fc743d12446dd0421fb45d00c2f541eaf9a did not remove withHiveSupport from readwrite.py Author: Yin HuaiCloses #13069 from yhuai/fixPython. (cherry picked from commit ba5487c061168627b27af2fa9610d53791fcc90d) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d187539 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d187539 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d187539 Branch: refs/heads/branch-2.0 Commit: 7d187539edc3b44f555b2d85a45add9f044cad8b Parents: 0b14b3f Author: Yin Huai Authored: Wed May 11 21:43:56 2016 -0700 Committer: Yin Huai Committed: Wed May 11 21:44:28 2016 -0700 -- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7d187539/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 5cb1860..c98aef1 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -875,7 +875,7 @@ def _test(): globs = pyspark.sql.readwriter.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') try: -spark = SparkSession.withHiveSupport(sc) +spark = SparkSession.builder.enableHiveSupport().getOrCreate() except py4j.protocol.Py4JError: spark = SparkSession(sc) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15072][SQL][PYSPARK][HOT-FIX] Remove SparkSession.withHiveSupport from readwrite.py
Repository: spark Updated Branches: refs/heads/master f036dd7ce -> ba5487c06 [SPARK-15072][SQL][PYSPARK][HOT-FIX] Remove SparkSession.withHiveSupport from readwrite.py ## What changes were proposed in this pull request? Seems https://github.com/apache/spark/commit/db573fc743d12446dd0421fb45d00c2f541eaf9a did not remove withHiveSupport from readwrite.py Author: Yin HuaiCloses #13069 from yhuai/fixPython. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba5487c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba5487c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba5487c0 Branch: refs/heads/master Commit: ba5487c061168627b27af2fa9610d53791fcc90d Parents: f036dd7 Author: Yin Huai Authored: Wed May 11 21:43:56 2016 -0700 Committer: Yin Huai Committed: Wed May 11 21:43:56 2016 -0700 -- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba5487c0/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 5cb1860..c98aef1 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -875,7 +875,7 @@ def _test(): globs = pyspark.sql.readwriter.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') try: -spark = SparkSession.withHiveSupport(sc) +spark = SparkSession.builder.enableHiveSupport().getOrCreate() except py4j.protocol.Py4JError: spark = SparkSession(sc) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14346] SHOW CREATE TABLE for data source tables
Repository: spark Updated Branches: refs/heads/branch-2.0 b2b04c6da -> 0b14b3f13 [SPARK-14346] SHOW CREATE TABLE for data source tables ## What changes were proposed in this pull request? This PR adds native `SHOW CREATE TABLE` DDL command for data source tables. Support for Hive tables will be added in follow-up PR(s). To show table creation DDL for data source tables created by CTAS statements, this PR also added partitioning and bucketing support for normal `CREATE TABLE ... USING ...` syntax. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) A new test suite `ShowCreateTableSuite` is added in sql/hive package to test the new feature. Author: Cheng LianCloses #12781 from liancheng/spark-14346-show-create-table. (cherry picked from commit f036dd7ce727b40877337da66d687214786c4f14) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b14b3f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b14b3f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b14b3f1 Branch: refs/heads/branch-2.0 Commit: 0b14b3f13c2575837ca47dc13adf3e15d88438b9 Parents: b2b04c6 Author: Cheng Lian Authored: Wed May 11 20:44:04 2016 -0700 Committer: Yin Huai Committed: Wed May 11 20:44:17 2016 -0700 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 5 +- .../apache/spark/sql/catalyst/identifiers.scala | 2 +- .../spark/sql/execution/SparkSqlParser.scala| 28 ++- .../spark/sql/execution/SparkStrategies.scala | 6 +- .../spark/sql/execution/command/commands.scala | 107 +--- .../command/createDataSourceTables.scala| 6 +- .../spark/sql/execution/command/tables.scala| 243 ++- .../spark/sql/execution/datasources/ddl.scala | 2 + .../apache/spark/sql/internal/CatalogImpl.scala | 6 +- .../sql/execution/command/DDLCommandSuite.scala | 1 - .../spark/sql/hive/HiveMetastoreCatalog.scala | 7 +- .../apache/spark/sql/hive/test/TestHive.scala | 2 - .../spark/sql/hive/ShowCreateTableSuite.scala | 169 + .../sql/hive/execution/HiveQuerySuite.scala | 1 - 14 files changed, 458 insertions(+), 127 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index ffb7a09..06ac37b 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -45,7 +45,9 @@ statement | ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties | DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase | createTableHeader ('(' colTypeList ')')? tableProvider -(OPTIONS tablePropertyList)? #createTableUsing +(OPTIONS tablePropertyList)? +(PARTITIONED BY partitionColumnNames=identifierList)? +bucketSpec? #createTableUsing | createTableHeader tableProvider (OPTIONS tablePropertyList)? (PARTITIONED BY partitionColumnNames=identifierList)? @@ -102,6 +104,7 @@ statement ((FROM | IN) db=identifier)? #showColumns | SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions | SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions +| SHOW CREATE TABLE tableIdentifier #showCreateTable | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)? tableIdentifier partitionSpec? describeColName? #describeTable http://git-wip-us.apache.org/repos/asf/spark/blob/0b14b3f1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index 7d05845..d7b48ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -44,7 +44,7
spark git commit: [SPARK-15080][CORE] Break copyAndReset into copy and reset
Repository: spark Updated Branches: refs/heads/master db573fc74 -> ff92eb2e8 [SPARK-15080][CORE] Break copyAndReset into copy and reset ## What changes were proposed in this pull request? Break copyAndReset into two methods copy and reset instead of just one. ## How was this patch tested? Existing Tests Author: Sandeep SinghCloses #12936 from techaddict/SPARK-15080. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff92eb2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff92eb2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff92eb2e Branch: refs/heads/master Commit: ff92eb2e80f2f38d10ac524ced82bb3f94b5b2bf Parents: db573fc Author: Sandeep Singh Authored: Thu May 12 11:12:09 2016 +0800 Committer: Wenchen Fan Committed: Thu May 12 11:12:09 2016 +0800 -- .../org/apache/spark/executor/TaskMetrics.scala | 10 ++- .../org/apache/spark/util/AccumulatorV2.scala | 75 .../apache/spark/util/AccumulatorV2Suite.scala | 17 + .../spark/sql/execution/metric/SQLMetrics.scala | 13 ++-- 4 files changed, 96 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ff92eb2e/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1893167..5bb505b 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -291,12 +291,20 @@ private[spark] object TaskMetrics extends Logging { private[spark] class BlockStatusesAccumulator extends AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] { - private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)] + private var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)] override def isZero(): Boolean = _seq.isEmpty override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator + override def copy(): BlockStatusesAccumulator = { +val newAcc = new BlockStatusesAccumulator +newAcc._seq = _seq.clone() +newAcc + } + + override def reset(): Unit = _seq.clear() + override def add(v: (BlockId, BlockStatus)): Unit = _seq += v override def merge(other: AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]]) http://git-wip-us.apache.org/repos/asf/spark/blob/ff92eb2e/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index c487903..0cf9df0 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -112,7 +112,22 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy * must return true. */ - def copyAndReset(): AccumulatorV2[IN, OUT] + def copyAndReset(): AccumulatorV2[IN, OUT] = { +val copyAcc = copy() +copyAcc.reset() +copyAcc + } + + /** + * Creates a new copy of this accumulator. + */ + def copy(): AccumulatorV2[IN, OUT] + + /** + * Resets this accumulator, which is zero value. i.e. call `isZero` must + * return true. + */ + def reset(): Unit /** * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. @@ -137,10 +152,10 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { throw new UnsupportedOperationException( "Accumulator must be registered before send to executor") } - val copy = copyAndReset() - assert(copy.isZero, "copyAndReset must return a zero value copy") - copy.metadata = metadata - copy + val copyAcc = copyAndReset() + assert(copyAcc.isZero, "copyAndReset must return a zero value copy") + copyAcc.metadata = metadata + copyAcc } else { this } @@ -249,8 +264,8 @@ private[spark] object AccumulatorContext { * @since 2.0.0 */ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { - private[this] var _sum = 0L - private[this] var _count = 0L + private var _sum = 0L + private var _count = 0L /** * Adds v to the accumulator, i.e. increment sum by v and count by 1. @@ -258,7 +273,17 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { */ override def isZero: Boolean = _sum == 0L && _count ==
spark git commit: [SPARK-15080][CORE] Break copyAndReset into copy and reset
Repository: spark Updated Branches: refs/heads/branch-2.0 114be703d -> b2b04c6da [SPARK-15080][CORE] Break copyAndReset into copy and reset ## What changes were proposed in this pull request? Break copyAndReset into two methods copy and reset instead of just one. ## How was this patch tested? Existing Tests Author: Sandeep SinghCloses #12936 from techaddict/SPARK-15080. (cherry picked from commit ff92eb2e80f2f38d10ac524ced82bb3f94b5b2bf) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2b04c6d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2b04c6d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2b04c6d Branch: refs/heads/branch-2.0 Commit: b2b04c6da0e37ed3d53e995295a5103273b512a2 Parents: 114be70 Author: Sandeep Singh Authored: Thu May 12 11:12:09 2016 +0800 Committer: Wenchen Fan Committed: Thu May 12 11:12:27 2016 +0800 -- .../org/apache/spark/executor/TaskMetrics.scala | 10 ++- .../org/apache/spark/util/AccumulatorV2.scala | 75 .../apache/spark/util/AccumulatorV2Suite.scala | 17 + .../spark/sql/execution/metric/SQLMetrics.scala | 13 ++-- 4 files changed, 96 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b2b04c6d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1893167..5bb505b 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -291,12 +291,20 @@ private[spark] object TaskMetrics extends Logging { private[spark] class BlockStatusesAccumulator extends AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]] { - private[this] var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)] + private var _seq = ArrayBuffer.empty[(BlockId, BlockStatus)] override def isZero(): Boolean = _seq.isEmpty override def copyAndReset(): BlockStatusesAccumulator = new BlockStatusesAccumulator + override def copy(): BlockStatusesAccumulator = { +val newAcc = new BlockStatusesAccumulator +newAcc._seq = _seq.clone() +newAcc + } + + override def reset(): Unit = _seq.clear() + override def add(v: (BlockId, BlockStatus)): Unit = _seq += v override def merge(other: AccumulatorV2[(BlockId, BlockStatus), Seq[(BlockId, BlockStatus)]]) http://git-wip-us.apache.org/repos/asf/spark/blob/b2b04c6d/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index c487903..0cf9df0 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -112,7 +112,22 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy * must return true. */ - def copyAndReset(): AccumulatorV2[IN, OUT] + def copyAndReset(): AccumulatorV2[IN, OUT] = { +val copyAcc = copy() +copyAcc.reset() +copyAcc + } + + /** + * Creates a new copy of this accumulator. + */ + def copy(): AccumulatorV2[IN, OUT] + + /** + * Resets this accumulator, which is zero value. i.e. call `isZero` must + * return true. + */ + def reset(): Unit /** * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. @@ -137,10 +152,10 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { throw new UnsupportedOperationException( "Accumulator must be registered before send to executor") } - val copy = copyAndReset() - assert(copy.isZero, "copyAndReset must return a zero value copy") - copy.metadata = metadata - copy + val copyAcc = copyAndReset() + assert(copyAcc.isZero, "copyAndReset must return a zero value copy") + copyAcc.metadata = metadata + copyAcc } else { this } @@ -249,8 +264,8 @@ private[spark] object AccumulatorContext { * @since 2.0.0 */ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { - private[this] var _sum = 0L - private[this] var _count = 0L + private var _sum = 0L + private var _count = 0L /** * Adds v to the accumulator, i.e. increment sum by v and count by 1. @@ -258,7 +273,17 @@
spark git commit: [SPARK-15072][SQL][PYSPARK] FollowUp: Remove SparkSession.withHiveSupport in PySpark
Repository: spark Updated Branches: refs/heads/branch-2.0 f8804bb10 -> 114be703d [SPARK-15072][SQL][PYSPARK] FollowUp: Remove SparkSession.withHiveSupport in PySpark ## What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/12851 Remove `SparkSession.withHiveSupport` in PySpark and instead use `SparkSession.builder. enableHiveSupport` ## How was this patch tested? Existing tests. Author: Sandeep SinghCloses #13063 from techaddict/SPARK-15072-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/114be703 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/114be703 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/114be703 Branch: refs/heads/branch-2.0 Commit: 114be703d5655b6456955e795e670cd62915b37e Parents: f8804bb Author: Sandeep Singh Authored: Wed May 11 17:44:00 2016 -0700 Committer: Andrew Or Committed: Wed May 11 17:44:37 2016 -0700 -- .../sbt_app_hive/src/main/scala/HiveApp.scala | 8 +--- python/pyspark/shell.py | 4 +++- python/pyspark/sql/session.py | 10 -- .../scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 4 files changed, 9 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/114be703/dev/audit-release/sbt_app_hive/src/main/scala/HiveApp.scala -- diff --git a/dev/audit-release/sbt_app_hive/src/main/scala/HiveApp.scala b/dev/audit-release/sbt_app_hive/src/main/scala/HiveApp.scala index f69d46c..8cbfb9c 100644 --- a/dev/audit-release/sbt_app_hive/src/main/scala/HiveApp.scala +++ b/dev/audit-release/sbt_app_hive/src/main/scala/HiveApp.scala @@ -33,7 +33,9 @@ object SparkSqlExample { case None => new SparkConf().setAppName("Simple Sql App") } val sc = new SparkContext(conf) -val sparkSession = SparkSession.withHiveSupport(sc) +val sparkSession = SparkSession.builder + .enableHiveSupport() + .getOrCreate() import sparkSession._ sql("DROP TABLE IF EXISTS src") @@ -41,14 +43,14 @@ object SparkSqlExample { sql("LOAD DATA LOCAL INPATH 'data.txt' INTO TABLE src") val results = sql("FROM src SELECT key, value WHERE key >= 0 AND KEY < 5").collect() results.foreach(println) - + def test(f: => Boolean, failureMsg: String) = { if (!f) { println(failureMsg) System.exit(-1) } } - + test(results.size == 5, "Unexpected number of selected elements: " + results) println("Test succeeded") sc.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/114be703/python/pyspark/shell.py -- diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index c6b0eda..adaa3b5 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -41,7 +41,9 @@ atexit.register(lambda: sc.stop()) try: # Try to access HiveConf, it will raise exception if Hive is not added sc._jvm.org.apache.hadoop.hive.conf.HiveConf() -spark = SparkSession.withHiveSupport(sc) +spark = SparkSession.builder\ +.enableHiveSupport()\ +.getOrCreate() except py4j.protocol.Py4JError: spark = SparkSession(sc) except TypeError: http://git-wip-us.apache.org/repos/asf/spark/blob/114be703/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 04842f6..4ee9ab8 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -182,16 +182,6 @@ class SparkSession(object): if SparkSession._instantiatedContext is None: SparkSession._instantiatedContext = self -@classmethod -@since(2.0) -def withHiveSupport(cls, sparkContext): -"""Returns a new SparkSession with a catalog backed by Hive. - -:param sparkContext: The underlying :class:`SparkContext`. -""" -jsparkSession = sparkContext._jvm.SparkSession.withHiveSupport(sparkContext._jsc.sc()) -return cls(sparkContext, jsparkSession) - @since(2.0) def newSession(self): """ http://git-wip-us.apache.org/repos/asf/spark/blob/114be703/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala -- diff --git a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index
spark git commit: [SPARK-15264][SPARK-15274][SQL] CSV Reader Error on Blank Column Names
Repository: spark Updated Branches: refs/heads/master f14c4ba00 -> 603f4453a [SPARK-15264][SPARK-15274][SQL] CSV Reader Error on Blank Column Names ## What changes were proposed in this pull request? When a CSV begins with: - `,,` OR - `"","",` meaning that the first column names are either empty or blank strings and `header` is specified to be `true`, then the column name is replaced with `C` + the index number of that given column. For example, if you were to read in the CSV: ``` "","second column" "hello", "there" ``` Then column names would become `"C0", "second column"`. This behavior aligns with what currently happens when `header` is specified to be `false` in recent versions of Spark. ### Current Behavior in Spark <=1.6 In Spark <=1.6, a CSV with a blank column name becomes a blank string, `""`, meaning that this column cannot be accessed. However the CSV reads in without issue. ### Current Behavior in Spark 2.0 Spark throws a NullPointerError and will not read in the file. Reproduction in 2.0 https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/346304/2828750690305044/484361/latest.html ## How was this patch tested? A new test was added to `CSVSuite` to account for this issue. We then have asserts that test for being able to select both the empty column names as well as the regular column names. Author: Bill ChambersAuthor: Bill Chambers Closes #13041 from anabranch/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/603f4453 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/603f4453 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/603f4453 Branch: refs/heads/master Commit: 603f4453a16825cc5773cfe24d6ae4cee5ec949a Parents: f14c4ba Author: Bill Chambers Authored: Wed May 11 17:42:13 2016 -0700 Committer: Andrew Or Committed: Wed May 11 17:42:13 2016 -0700 -- python/pyspark/sql/readwriter.py| 2 +- .../execution/datasources/csv/DefaultSource.scala | 6 -- .../src/test/resources/cars-blank-column-name.csv | 3 +++ .../sql/execution/datasources/csv/CSVSuite.scala| 16 ++-- 4 files changed, 22 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7fd7583..5cb1860 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -358,7 +358,7 @@ class DataFrameReader(object): >>> df = spark.read.csv('python/test_support/sql/ages.csv') >>> df.dtypes -[('C0', 'string'), ('C1', 'string')] +[('_c0', 'string'), ('_c1', 'string')] """ if schema is not None: self.schema(schema) http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index 948fac0..f47ed76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -61,9 +61,11 @@ class DefaultSource extends FileFormat with DataSourceRegister { val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine) val header = if (csvOptions.headerFlag) { - firstRow + firstRow.zipWithIndex.map { case (value, index) => +if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value + } } else { - firstRow.zipWithIndex.map { case (value, index) => s"C$index" } + firstRow.zipWithIndex.map { case (value, index) => s"_c$index" } } val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths) http://git-wip-us.apache.org/repos/asf/spark/blob/603f4453/sql/core/src/test/resources/cars-blank-column-name.csv -- diff --git a/sql/core/src/test/resources/cars-blank-column-name.csv b/sql/core/src/test/resources/cars-blank-column-name.csv new file mode 100644 index 000..0b804b1 --- /dev/null +++ b/sql/core/src/test/resources/cars-blank-column-name.csv @@ -0,0 +1,3 @@ +"",,make,customer,comment +2012,"Tesla","S","bill","blank"
spark git commit: [SPARK-15276][SQL] CREATE TABLE with LOCATION should imply EXTERNAL
Repository: spark Updated Branches: refs/heads/branch-2.0 f9ea54575 -> f763c1485 [SPARK-15276][SQL] CREATE TABLE with LOCATION should imply EXTERNAL ## What changes were proposed in this pull request? Before: ```sql -- uses that location but issues a warning CREATE TABLE my_tab LOCATION /some/path -- deletes any existing data in the specified location DROP TABLE my_tab ``` After: ```sql -- uses that location but creates an EXTERNAL table instead CREATE TABLE my_tab LOCATION /some/path -- does not delete the data at /some/path DROP TABLE my_tab ``` This patch essentially makes the `EXTERNAL` field optional. This is related to #13032. ## How was this patch tested? New test in `DDLCommandSuite`. Author: Andrew OrCloses #13060 from andrewor14/location-implies-external. (cherry picked from commit f14c4ba001fbdbcc9faa46896f1f9d08a7d06609) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f763c148 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f763c148 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f763c148 Branch: refs/heads/branch-2.0 Commit: f763c14851f6e55e61de8ef79ae449a7257a547d Parents: f9ea545 Author: Andrew Or Authored: Wed May 11 17:29:58 2016 -0700 Committer: Andrew Or Committed: Wed May 11 17:30:06 2016 -0700 -- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 12 +++- .../spark/sql/execution/command/DDLCommandSuite.scala | 12 .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 8 +++- .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 5 + 4 files changed, 23 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f763c148/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a51665f..53aba1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -745,11 +745,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.bucketSpec != null) { throw operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) } -val tableType = if (external) { - CatalogTableType.EXTERNAL -} else { - CatalogTableType.MANAGED -} val comment = Option(ctx.STRING).map(string) val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns) @@ -791,6 +786,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), compressed = false, serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) +// If location is defined, we'll assume this is an external table. +// Otherwise, we may accidentally delete existing data. +val tableType = if (external || location.isDefined) { + CatalogTableType.EXTERNAL +} else { + CatalogTableType.MANAGED +} // TODO support the sql text - have a proper location for this! val tableDesc = CatalogTable( http://git-wip-us.apache.org/repos/asf/spark/blob/f763c148/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index fa8dabf..aeb613a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -227,6 +227,18 @@ class DDLCommandSuite extends PlanTest { } } + test("create table - location implies external") { +val query = "CREATE TABLE my_tab LOCATION '/something/anything'" +parser.parsePlan(query) match { + case ct: CreateTable => +assert(ct.table.tableType == CatalogTableType.EXTERNAL) +assert(ct.table.storage.locationUri == Some("/something/anything")) + case other => +fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + +s"got ${other.getClass.getName}: $query") +} + } + // ALTER TABLE table_name RENAME TO new_table_name; // ALTER VIEW
spark git commit: [SPARK-15276][SQL] CREATE TABLE with LOCATION should imply EXTERNAL
Repository: spark Updated Branches: refs/heads/master b9cf617a6 -> f14c4ba00 [SPARK-15276][SQL] CREATE TABLE with LOCATION should imply EXTERNAL ## What changes were proposed in this pull request? Before: ```sql -- uses that location but issues a warning CREATE TABLE my_tab LOCATION /some/path -- deletes any existing data in the specified location DROP TABLE my_tab ``` After: ```sql -- uses that location but creates an EXTERNAL table instead CREATE TABLE my_tab LOCATION /some/path -- does not delete the data at /some/path DROP TABLE my_tab ``` This patch essentially makes the `EXTERNAL` field optional. This is related to #13032. ## How was this patch tested? New test in `DDLCommandSuite`. Author: Andrew OrCloses #13060 from andrewor14/location-implies-external. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f14c4ba0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f14c4ba0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f14c4ba0 Branch: refs/heads/master Commit: f14c4ba001fbdbcc9faa46896f1f9d08a7d06609 Parents: b9cf617 Author: Andrew Or Authored: Wed May 11 17:29:58 2016 -0700 Committer: Andrew Or Committed: Wed May 11 17:29:58 2016 -0700 -- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 12 +++- .../spark/sql/execution/command/DDLCommandSuite.scala | 12 .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 8 +++- .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 5 + 4 files changed, 23 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f14c4ba0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a51665f..53aba1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -745,11 +745,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { if (ctx.bucketSpec != null) { throw operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx) } -val tableType = if (external) { - CatalogTableType.EXTERNAL -} else { - CatalogTableType.MANAGED -} val comment = Option(ctx.STRING).map(string) val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns) @@ -791,6 +786,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), compressed = false, serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) +// If location is defined, we'll assume this is an external table. +// Otherwise, we may accidentally delete existing data. +val tableType = if (external || location.isDefined) { + CatalogTableType.EXTERNAL +} else { + CatalogTableType.MANAGED +} // TODO support the sql text - have a proper location for this! val tableDesc = CatalogTable( http://git-wip-us.apache.org/repos/asf/spark/blob/f14c4ba0/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index fa8dabf..aeb613a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -227,6 +227,18 @@ class DDLCommandSuite extends PlanTest { } } + test("create table - location implies external") { +val query = "CREATE TABLE my_tab LOCATION '/something/anything'" +parser.parsePlan(query) match { + case ct: CreateTable => +assert(ct.table.tableType == CatalogTableType.EXTERNAL) +assert(ct.table.storage.locationUri == Some("/something/anything")) + case other => +fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," + +s"got ${other.getClass.getName}: $query") +} + } + // ALTER TABLE table_name RENAME TO new_table_name; // ALTER VIEW view_name RENAME TO new_view_name; test("alter table/view: rename table/view") {
spark git commit: [SPARK-15260] Atomically resize memory pools (branch 1.6)
Repository: spark Updated Branches: refs/heads/branch-1.6 e2a43d007 -> fd2da7b91 [SPARK-15260] Atomically resize memory pools (branch 1.6) ## What changes were proposed in this pull request? (This is the branch-1.6 version of #13039) When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call memoryStore.evictBlocksToFreeSpace, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state. This patch minimizes the things we do between the two calls to make the resizing more atomic. ## How was this patch tested? Jenkins. Author: Andrew OrCloses #13058 from andrewor14/safer-pool-1.6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd2da7b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd2da7b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd2da7b9 Branch: refs/heads/branch-1.6 Commit: fd2da7b91e33e8fc994c4a6a0524831807f1324f Parents: e2a43d0 Author: Andrew Or Authored: Wed May 11 17:25:57 2016 -0700 Committer: Davies Liu Committed: Wed May 11 17:25:57 2016 -0700 -- .../apache/spark/memory/StorageMemoryPool.scala | 11 +- .../spark/memory/UnifiedMemoryManager.scala | 15 - .../spark/memory/MemoryManagerSuite.scala | 15 + .../memory/UnifiedMemoryManagerSuite.scala | 23 4 files changed, 53 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fd2da7b9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 70af83b..89edaf5 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w } /** - * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number - * of bytes removed from the pool's capacity. + * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. + * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. + * + * @return number of bytes to be removed from the pool's capacity. */ - def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { -// First, shrink the pool by reclaiming free memory: + def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) -decrementPoolSize(spaceFreedByReleasingUnusedMemory) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: @@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. - decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory http://git-wip-us.apache.org/repos/asf/spark/blob/fd2da7b9/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 829f054..802087c 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] ( storageRegionSize, maxMemory - storageRegionSize) { + assertInvariant() + // We always maintain this invariant: - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + private def assertInvariant(): Unit = { +assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + } override def maxStorageMemory: Long =
spark git commit: [SPARK-15256] [SQL] [PySpark] Clarify DataFrameReader.jdbc() docstring
Repository: spark Updated Branches: refs/heads/branch-2.0 4e56857ca -> f9ea54575 [SPARK-15256] [SQL] [PySpark] Clarify DataFrameReader.jdbc() docstring This PR: * Corrects the documentation for the `properties` parameter, which is supposed to be a dictionary and not a list. * Generally clarifies the Python docstring for DataFrameReader.jdbc() by pulling from the [Scala docstrings](https://github.com/apache/spark/blob/b28137764716f56fa1a923c4278624a56364a505/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L201-L251) and rephrasing things. * Corrects minor Sphinx typos. Author: Nicholas ChammasCloses #13034 from nchammas/SPARK-15256. (cherry picked from commit b9cf617a6fa8812b45ff33acd109757a59f91dfa) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9ea5457 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9ea5457 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9ea5457 Branch: refs/heads/branch-2.0 Commit: f9ea54575659f489750a37e9a8c1105a825c94f8 Parents: 4e56857 Author: Nicholas Chammas Authored: Wed May 11 15:31:16 2016 -0700 Committer: Davies Liu Committed: Wed May 11 15:31:25 2016 -0700 -- python/pyspark/sql/readwriter.py | 67 ++- 1 file changed, 35 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f9ea5457/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bd728c9..7fd7583 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -402,10 +402,9 @@ class DataFrameReader(object): def orc(self, path): """Loads an ORC file, returning the result as a :class:`DataFrame`. -::Note: Currently ORC support is only available together with -:class:`HiveContext`. +.. note:: Currently ORC support is only available together with Hive support. ->>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') +>>> df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> df.dtypes [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ @@ -415,28 +414,31 @@ class DataFrameReader(object): def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None): """ -Construct a :class:`DataFrame` representing the database table accessible -via JDBC URL `url` named `table` and connection `properties`. +Construct a :class:`DataFrame` representing the database table named ``table`` +accessible via JDBC URL ``url`` and connection ``properties``. -The `column` parameter could be used to partition the table, then it will -be retrieved in parallel based on the parameters passed to this function. +Partitions of the table will be retrieved in parallel if either ``column`` or +``predicates`` is specified. -The `predicates` parameter gives a list expressions suitable for inclusion -in WHERE clauses; each one defines one partition of the :class:`DataFrame`. +If both ``column`` and ``predicates`` are specified, ``column`` will be used. -::Note: Don't create too many partitions in parallel on a large cluster; +.. note:: Don't create too many partitions in parallel on a large cluster; \ otherwise Spark might crash your external database systems. -:param url: a JDBC URL -:param table: name of table -:param column: the column used to partition -:param lowerBound: the lower bound of partition column -:param upperBound: the upper bound of the partition column +:param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` +:param table: the name of the table +:param column: the name of an integer column that will be used for partitioning; + if this parameter is specified, then ``numPartitions``, ``lowerBound`` + (inclusive), and ``upperBound`` (exclusive) will form partition strides + for generated WHERE clause expressions used to split the column + ``column`` evenly +:param lowerBound: the minimum value of ``column`` used to decide partition stride +:param upperBound: the maximum value of ``column`` used to decide partition stride :param numPartitions: the number of partitions -:param predicates: a list of expressions -
spark git commit: [SPARK-15256] [SQL] [PySpark] Clarify DataFrameReader.jdbc() docstring
Repository: spark Updated Branches: refs/heads/master 8881765ac -> b9cf617a6 [SPARK-15256] [SQL] [PySpark] Clarify DataFrameReader.jdbc() docstring This PR: * Corrects the documentation for the `properties` parameter, which is supposed to be a dictionary and not a list. * Generally clarifies the Python docstring for DataFrameReader.jdbc() by pulling from the [Scala docstrings](https://github.com/apache/spark/blob/b28137764716f56fa1a923c4278624a56364a505/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L201-L251) and rephrasing things. * Corrects minor Sphinx typos. Author: Nicholas ChammasCloses #13034 from nchammas/SPARK-15256. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9cf617a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9cf617a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9cf617a Branch: refs/heads/master Commit: b9cf617a6fa8812b45ff33acd109757a59f91dfa Parents: 8881765 Author: Nicholas Chammas Authored: Wed May 11 15:31:16 2016 -0700 Committer: Davies Liu Committed: Wed May 11 15:31:16 2016 -0700 -- python/pyspark/sql/readwriter.py | 67 ++- 1 file changed, 35 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b9cf617a/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bd728c9..7fd7583 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -402,10 +402,9 @@ class DataFrameReader(object): def orc(self, path): """Loads an ORC file, returning the result as a :class:`DataFrame`. -::Note: Currently ORC support is only available together with -:class:`HiveContext`. +.. note:: Currently ORC support is only available together with Hive support. ->>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned') +>>> df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> df.dtypes [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ @@ -415,28 +414,31 @@ class DataFrameReader(object): def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None): """ -Construct a :class:`DataFrame` representing the database table accessible -via JDBC URL `url` named `table` and connection `properties`. +Construct a :class:`DataFrame` representing the database table named ``table`` +accessible via JDBC URL ``url`` and connection ``properties``. -The `column` parameter could be used to partition the table, then it will -be retrieved in parallel based on the parameters passed to this function. +Partitions of the table will be retrieved in parallel if either ``column`` or +``predicates`` is specified. -The `predicates` parameter gives a list expressions suitable for inclusion -in WHERE clauses; each one defines one partition of the :class:`DataFrame`. +If both ``column`` and ``predicates`` are specified, ``column`` will be used. -::Note: Don't create too many partitions in parallel on a large cluster; +.. note:: Don't create too many partitions in parallel on a large cluster; \ otherwise Spark might crash your external database systems. -:param url: a JDBC URL -:param table: name of table -:param column: the column used to partition -:param lowerBound: the lower bound of partition column -:param upperBound: the upper bound of the partition column +:param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` +:param table: the name of the table +:param column: the name of an integer column that will be used for partitioning; + if this parameter is specified, then ``numPartitions``, ``lowerBound`` + (inclusive), and ``upperBound`` (exclusive) will form partition strides + for generated WHERE clause expressions used to split the column + ``column`` evenly +:param lowerBound: the minimum value of ``column`` used to decide partition stride +:param upperBound: the maximum value of ``column`` used to decide partition stride :param numPartitions: the number of partitions -:param predicates: a list of expressions -:param properties: JDBC database connection arguments, a list of arbitrary string - tag/value.
spark git commit: [SPARK-15257][SQL] Require CREATE EXTERNAL TABLE to specify LOCATION
Repository: spark Updated Branches: refs/heads/branch-2.0 b1e14d9bf -> 4e56857ca [SPARK-15257][SQL] Require CREATE EXTERNAL TABLE to specify LOCATION ## What changes were proposed in this pull request? Before: ```sql -- uses warehouse dir anyway CREATE EXTERNAL TABLE my_tab -- doesn't actually delete the data DROP TABLE my_tab ``` After: ```sql -- no location is provided, throws exception CREATE EXTERNAL TABLE my_tab -- creates an external table using that location CREATE EXTERNAL TABLE my_tab LOCATION '/path/to/something' -- doesn't delete the data, which is expected DROP TABLE my_tab ``` ## How was this patch tested? New test in `DDLCommandSuite` Author: Andrew OrCloses #13032 from andrewor14/create-external-table-location. (cherry picked from commit 8881765ac7ac6ba6fe9ef0d0a669c08cca58ed93) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e56857c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e56857c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e56857c Branch: refs/heads/branch-2.0 Commit: 4e56857ca0c3d61cc257ffd3bd1c59ed44c862d7 Parents: b1e14d9 Author: Andrew Or Authored: Wed May 11 15:30:53 2016 -0700 Committer: Yin Huai Committed: Wed May 11 15:31:05 2016 -0700 -- .../spark/sql/execution/SparkSqlParser.scala| 4 .../sql/execution/command/DDLCommandSuite.scala | 20 ++-- .../hive/execution/HiveCompatibilitySuite.scala | 6 -- .../spark/sql/hive/HiveDDLCommandSuite.scala| 9 ++--- .../sql/hive/execution/HiveCommandSuite.scala | 2 +- .../sql/hive/execution/HiveTableScanSuite.scala | 2 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 2 +- 7 files changed, 31 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e56857c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 87e6f90..a51665f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -780,6 +780,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(EmptyStorageFormat) val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) val location = Option(ctx.locationSpec).map(visitLocationSpec) +// If we are creating an EXTERNAL table, then the LOCATION field is required +if (external && location.isEmpty) { + throw operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) +} val storage = CatalogStorageFormat( locationUri = location, inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), http://git-wip-us.apache.org/repos/asf/spark/blob/4e56857c/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index a728ac3..fa8dabf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{FunctionResource, FunctionResourceType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource} +import org.apache.spark.sql.catalyst.catalog.FunctionResourceType import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project @@ -35,7 +36,7 @@ class DDLCommandSuite extends PlanTest { parser.parsePlan(sql) } assert(e.getMessage.toLowerCase.contains("operation not allowed")) -containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p)) } +containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) } } test("create database") { @@ -211,6 +212,21 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed4, expected4) } + test("create external table - location must be specified") { +
spark git commit: [SPARK-15257][SQL] Require CREATE EXTERNAL TABLE to specify LOCATION
Repository: spark Updated Branches: refs/heads/master 40ba87f76 -> 8881765ac [SPARK-15257][SQL] Require CREATE EXTERNAL TABLE to specify LOCATION ## What changes were proposed in this pull request? Before: ```sql -- uses warehouse dir anyway CREATE EXTERNAL TABLE my_tab -- doesn't actually delete the data DROP TABLE my_tab ``` After: ```sql -- no location is provided, throws exception CREATE EXTERNAL TABLE my_tab -- creates an external table using that location CREATE EXTERNAL TABLE my_tab LOCATION '/path/to/something' -- doesn't delete the data, which is expected DROP TABLE my_tab ``` ## How was this patch tested? New test in `DDLCommandSuite` Author: Andrew OrCloses #13032 from andrewor14/create-external-table-location. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8881765a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8881765a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8881765a Branch: refs/heads/master Commit: 8881765ac7ac6ba6fe9ef0d0a669c08cca58ed93 Parents: 40ba87f Author: Andrew Or Authored: Wed May 11 15:30:53 2016 -0700 Committer: Yin Huai Committed: Wed May 11 15:30:53 2016 -0700 -- .../spark/sql/execution/SparkSqlParser.scala| 4 .../sql/execution/command/DDLCommandSuite.scala | 20 ++-- .../hive/execution/HiveCompatibilitySuite.scala | 6 -- .../spark/sql/hive/HiveDDLCommandSuite.scala| 9 ++--- .../sql/hive/execution/HiveCommandSuite.scala | 2 +- .../sql/hive/execution/HiveTableScanSuite.scala | 2 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 2 +- 7 files changed, 31 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8881765a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 87e6f90..a51665f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -780,6 +780,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(EmptyStorageFormat) val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) val location = Option(ctx.locationSpec).map(visitLocationSpec) +// If we are creating an EXTERNAL table, then the LOCATION field is required +if (external && location.isEmpty) { + throw operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) +} val storage = CatalogStorageFormat( locationUri = location, inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), http://git-wip-us.apache.org/repos/asf/spark/blob/8881765a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index a728ac3..fa8dabf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{FunctionResource, FunctionResourceType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, FunctionResource} +import org.apache.spark.sql.catalyst.catalog.FunctionResourceType import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project @@ -35,7 +36,7 @@ class DDLCommandSuite extends PlanTest { parser.parsePlan(sql) } assert(e.getMessage.toLowerCase.contains("operation not allowed")) -containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p)) } +containsThesePhrases.foreach { p => assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) } } test("create database") { @@ -211,6 +212,21 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed4, expected4) } + test("create external table - location must be specified") { +assertUnsupported( + sql = "CREATE EXTERNAL TABLE my_tab", + containsThesePhrases = Seq("create external table",
spark git commit: [SPARK-15278] [SQL] Remove experimental tag from Python DataFrame
Repository: spark Updated Branches: refs/heads/master de9c85cca -> 40ba87f76 [SPARK-15278] [SQL] Remove experimental tag from Python DataFrame ## What changes were proposed in this pull request? Earlier we removed experimental tag for Scala/Java DataFrames, but haven't done so for Python. This patch removes the experimental flag for Python and declares them stable. ## How was this patch tested? N/A. Author: Reynold XinCloses #13062 from rxin/SPARK-15278. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40ba87f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40ba87f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40ba87f7 Branch: refs/heads/master Commit: 40ba87f769ab03721d01c7960b89a8c414fcfbca Parents: de9c85c Author: Reynold Xin Authored: Wed May 11 15:12:27 2016 -0700 Committer: Davies Liu Committed: Wed May 11 15:12:27 2016 -0700 -- python/pyspark/sql/column.py| 2 -- python/pyspark/sql/dataframe.py | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40ba87f7/python/pyspark/sql/column.py -- diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 90fb76f..5b26e94 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -139,8 +139,6 @@ class Column(object): df.colName + 1 1 / df.colName -.. note:: Experimental - .. versionadded:: 1.3 """ http://git-wip-us.apache.org/repos/asf/spark/blob/40ba87f7/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5378c32..49b4818 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -63,8 +63,6 @@ class DataFrame(object): people.filter(people.age > 30).join(department, people.deptId == department.id)\ .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"}) -.. note:: Experimental - .. versionadded:: 1.3 """ @@ -206,6 +204,8 @@ class DataFrame(object): :class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming source present. + +.. note:: Experimental """ return self._jdf.isStreaming() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15278] [SQL] Remove experimental tag from Python DataFrame
Repository: spark Updated Branches: refs/heads/branch-2.0 0699acc1b -> b1e14d9bf [SPARK-15278] [SQL] Remove experimental tag from Python DataFrame ## What changes were proposed in this pull request? Earlier we removed experimental tag for Scala/Java DataFrames, but haven't done so for Python. This patch removes the experimental flag for Python and declares them stable. ## How was this patch tested? N/A. Author: Reynold XinCloses #13062 from rxin/SPARK-15278. (cherry picked from commit 40ba87f769ab03721d01c7960b89a8c414fcfbca) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1e14d9b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1e14d9b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1e14d9b Branch: refs/heads/branch-2.0 Commit: b1e14d9bfb4df836b827b572e8dd07548a41bc3f Parents: 0699acc Author: Reynold Xin Authored: Wed May 11 15:12:27 2016 -0700 Committer: Davies Liu Committed: Wed May 11 15:12:39 2016 -0700 -- python/pyspark/sql/column.py| 2 -- python/pyspark/sql/dataframe.py | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b1e14d9b/python/pyspark/sql/column.py -- diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 90fb76f..5b26e94 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -139,8 +139,6 @@ class Column(object): df.colName + 1 1 / df.colName -.. note:: Experimental - .. versionadded:: 1.3 """ http://git-wip-us.apache.org/repos/asf/spark/blob/b1e14d9b/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5378c32..49b4818 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -63,8 +63,6 @@ class DataFrame(object): people.filter(people.age > 30).join(department, people.deptId == department.id)\ .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"}) -.. note:: Experimental - .. versionadded:: 1.3 """ @@ -206,6 +204,8 @@ class DataFrame(object): :class:`DataFrameWriter`. Methods that return a single answer, (e.g., :func:`count` or :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming source present. + +.. note:: Experimental """ return self._jdf.isStreaming() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15270] [SQL] Use SparkSession Builder to build a session with HiveSupport
Repository: spark Updated Branches: refs/heads/branch-2.0 2454f6abf -> 0699acc1b [SPARK-15270] [SQL] Use SparkSession Builder to build a session with HiveSupport ## What changes were proposed in this pull request? Before: Creating a hiveContext was failing ```python from pyspark.sql import HiveContext hc = HiveContext(sc) ``` with ``` Traceback (most recent call last): File "", line 1, in File "spark-2.0/python/pyspark/sql/context.py", line 458, in __init__ sparkSession = SparkSession.withHiveSupport(sparkContext) File "spark-2.0/python/pyspark/sql/session.py", line 192, in withHiveSupport jsparkSession = sparkContext._jvm.SparkSession.withHiveSupport(sparkContext._jsc.sc()) File "spark-2.0/python/lib/py4j-0.9.2-src.zip/py4j/java_gateway.py", line 1048, in __getattr__ py4j.protocol.Py4JError: org.apache.spark.sql.SparkSession.withHiveSupport does not exist in the JVM ``` Now: ```python >>> from pyspark.sql import HiveContext >>> hc = HiveContext(sc) >>> hc.range(0, 100) DataFrame[id: bigint] >>> hc.range(0, 100).count() 100 ``` ## How was this patch tested? Existing Tests, tested manually in python shell Author: Sandeep SinghCloses #13056 from techaddict/SPARK-15270. (cherry picked from commit de9c85ccaacd12de9837eb88eae0a7e7ededd679) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0699acc1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0699acc1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0699acc1 Branch: refs/heads/branch-2.0 Commit: 0699acc1bafcc2ce3f2c4eb795b0e4c6b3448185 Parents: 2454f6a Author: Sandeep Singh Authored: Wed May 11 14:15:18 2016 -0700 Committer: Davies Liu Committed: Wed May 11 14:15:35 2016 -0700 -- python/pyspark/sql/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0699acc1/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 78ab2e8..02e742c 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -455,7 +455,7 @@ class HiveContext(SQLContext): def __init__(self, sparkContext, jhiveContext=None): if jhiveContext is None: -sparkSession = SparkSession.withHiveSupport(sparkContext) +sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() else: sparkSession = SparkSession(sparkContext, jhiveContext.sparkSession()) SQLContext.__init__(self, sparkContext, sparkSession, jhiveContext) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15270] [SQL] Use SparkSession Builder to build a session with HiveSupport
Repository: spark Updated Branches: refs/heads/master 40a949aae -> de9c85cca [SPARK-15270] [SQL] Use SparkSession Builder to build a session with HiveSupport ## What changes were proposed in this pull request? Before: Creating a hiveContext was failing ```python from pyspark.sql import HiveContext hc = HiveContext(sc) ``` with ``` Traceback (most recent call last): File "", line 1, in File "spark-2.0/python/pyspark/sql/context.py", line 458, in __init__ sparkSession = SparkSession.withHiveSupport(sparkContext) File "spark-2.0/python/pyspark/sql/session.py", line 192, in withHiveSupport jsparkSession = sparkContext._jvm.SparkSession.withHiveSupport(sparkContext._jsc.sc()) File "spark-2.0/python/lib/py4j-0.9.2-src.zip/py4j/java_gateway.py", line 1048, in __getattr__ py4j.protocol.Py4JError: org.apache.spark.sql.SparkSession.withHiveSupport does not exist in the JVM ``` Now: ```python >>> from pyspark.sql import HiveContext >>> hc = HiveContext(sc) >>> hc.range(0, 100) DataFrame[id: bigint] >>> hc.range(0, 100).count() 100 ``` ## How was this patch tested? Existing Tests, tested manually in python shell Author: Sandeep SinghCloses #13056 from techaddict/SPARK-15270. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de9c85cc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de9c85cc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de9c85cc Branch: refs/heads/master Commit: de9c85ccaacd12de9837eb88eae0a7e7ededd679 Parents: 40a949a Author: Sandeep Singh Authored: Wed May 11 14:15:18 2016 -0700 Committer: Davies Liu Committed: Wed May 11 14:15:18 2016 -0700 -- python/pyspark/sql/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de9c85cc/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 78ab2e8..02e742c 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -455,7 +455,7 @@ class HiveContext(SQLContext): def __init__(self, sparkContext, jhiveContext=None): if jhiveContext is None: -sparkSession = SparkSession.withHiveSupport(sparkContext) +sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() else: sparkSession = SparkSession(sparkContext, jhiveContext.sparkSession()) SQLContext.__init__(self, sparkContext, sparkSession, jhiveContext) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15262] Synchronize block manager / scheduler executor state
Repository: spark Updated Branches: refs/heads/master 7ecd49688 -> 40a949aae [SPARK-15262] Synchronize block manager / scheduler executor state ## What changes were proposed in this pull request? If an executor is still alive even after the scheduler has removed its metadata, we may receive a heartbeat from that executor and tell its block manager to reregister itself. If that happens, the block manager master will know about the executor, but the scheduler will not. That is a dangerous situation, because when the executor does get disconnected later, the scheduler will not ask the block manager to also remove metadata for that executor. Later, when we try to clean up an RDD or a broadcast variable, we may try to send a message to that executor, triggering an exception. ## How was this patch tested? Jenkins. Author: Andrew OrCloses #13055 from andrewor14/block-manager-remove. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40a949aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40a949aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40a949aa Branch: refs/heads/master Commit: 40a949aae9c3040019a52482d091912a85b0f4d4 Parents: 7ecd496 Author: Andrew Or Authored: Wed May 11 13:36:58 2016 -0700 Committer: Shixiong Zhu Committed: Wed May 11 13:36:58 2016 -0700 -- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40a949aa/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8896391..0fea9c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -289,7 +289,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) -case None => logInfo(s"Asked to remove non-existent executor $executorId") +case None => + // SPARK-15262: If an executor is still alive even after the scheduler has removed + // its metadata, we may receive a heartbeat from that executor and tell its block + // manager to reregister itself. If that happens, the block manager master will know + // about the executor, but the scheduler will not. Therefore, we should remove the + // executor from the block manager when we hit this case. + scheduler.sc.env.blockManager.master.removeExecutor(executorId) + logInfo(s"Asked to remove non-existent executor $executorId") } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15262] Synchronize block manager / scheduler executor state
Repository: spark Updated Branches: refs/heads/branch-1.6 ced71d353 -> e2a43d007 [SPARK-15262] Synchronize block manager / scheduler executor state ## What changes were proposed in this pull request? If an executor is still alive even after the scheduler has removed its metadata, we may receive a heartbeat from that executor and tell its block manager to reregister itself. If that happens, the block manager master will know about the executor, but the scheduler will not. That is a dangerous situation, because when the executor does get disconnected later, the scheduler will not ask the block manager to also remove metadata for that executor. Later, when we try to clean up an RDD or a broadcast variable, we may try to send a message to that executor, triggering an exception. ## How was this patch tested? Jenkins. Author: Andrew OrCloses #13055 from andrewor14/block-manager-remove. (cherry picked from commit 40a949aae9c3040019a52482d091912a85b0f4d4) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2a43d00 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2a43d00 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2a43d00 Branch: refs/heads/branch-1.6 Commit: e2a43d0070b7204b1c6ed1c9292f1d215e0df30d Parents: ced71d3 Author: Andrew Or Authored: Wed May 11 13:36:58 2016 -0700 Committer: Shixiong Zhu Committed: Wed May 11 13:37:16 2016 -0700 -- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2a43d00/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7189685..87f2dbf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -267,7 +267,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) -case None => logInfo(s"Asked to remove non-existent executor $executorId") +case None => + // SPARK-15262: If an executor is still alive even after the scheduler has removed + // its metadata, we may receive a heartbeat from that executor and tell its block + // manager to reregister itself. If that happens, the block manager master will know + // about the executor, but the scheduler will not. Therefore, we should remove the + // executor from the block manager when we hit this case. + scheduler.sc.env.blockManager.master.removeExecutor(executorId) + logInfo(s"Asked to remove non-existent executor $executorId") } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15262] Synchronize block manager / scheduler executor state
Repository: spark Updated Branches: refs/heads/branch-2.0 6e08eb469 -> 2454f6abf [SPARK-15262] Synchronize block manager / scheduler executor state ## What changes were proposed in this pull request? If an executor is still alive even after the scheduler has removed its metadata, we may receive a heartbeat from that executor and tell its block manager to reregister itself. If that happens, the block manager master will know about the executor, but the scheduler will not. That is a dangerous situation, because when the executor does get disconnected later, the scheduler will not ask the block manager to also remove metadata for that executor. Later, when we try to clean up an RDD or a broadcast variable, we may try to send a message to that executor, triggering an exception. ## How was this patch tested? Jenkins. Author: Andrew OrCloses #13055 from andrewor14/block-manager-remove. (cherry picked from commit 40a949aae9c3040019a52482d091912a85b0f4d4) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2454f6ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2454f6ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2454f6ab Branch: refs/heads/branch-2.0 Commit: 2454f6abf29c938420dda8319a4e4afd758fc4e3 Parents: 6e08eb4 Author: Andrew Or Authored: Wed May 11 13:36:58 2016 -0700 Committer: Shixiong Zhu Committed: Wed May 11 13:37:05 2016 -0700 -- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2454f6ab/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8896391..0fea9c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -289,7 +289,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) -case None => logInfo(s"Asked to remove non-existent executor $executorId") +case None => + // SPARK-15262: If an executor is still alive even after the scheduler has removed + // its metadata, we may receive a heartbeat from that executor and tell its block + // manager to reregister itself. If that happens, the block manager master will know + // about the executor, but the scheduler will not. Therefore, we should remove the + // executor from the block manager when we hit this case. + scheduler.sc.env.blockManager.master.removeExecutor(executorId) + logInfo(s"Asked to remove non-existent executor $executorId") } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12200][SQL] Add __contains__ implementation to Row
Repository: spark Updated Branches: refs/heads/master bb88ad4e0 -> 7ecd49688 [SPARK-12200][SQL] Add __contains__ implementation to Row https://issues.apache.org/jira/browse/SPARK-12200 Author: Maciej BrynskiAuthor: Maciej Bryński Closes #10194 from maver1ck/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ecd4968 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ecd4968 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ecd4968 Branch: refs/heads/master Commit: 7ecd496884f6f126ab186b9ceaa861a571d6155c Parents: bb88ad4 Author: Maciej Brynski Authored: Wed May 11 13:15:11 2016 -0700 Committer: Reynold Xin Committed: Wed May 11 13:15:11 2016 -0700 -- python/pyspark/sql/types.py | 22 +- 1 file changed, 21 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ecd4968/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index f7cd4b8..30ab130 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1359,7 +1359,13 @@ def _create_row(fields, values): class Row(tuple): """ -A row in L{DataFrame}. The fields in it can be accessed like attributes. +A row in L{DataFrame}. +The fields in it can be accessed: + +* like attributes (``row.key``) +* like dictionary values (``row[key]``) + +``key in row`` will search through row keys. Row can be used to create a row object by using named arguments, the fields will be sorted by names. @@ -1371,6 +1377,10 @@ class Row(tuple): ('Alice', 11) >>> row.name, row.age ('Alice', 11) +>>> 'name' in row +True +>>> 'wrong_key' in row +False Row also can be used to create another Row like class, then it could be used to create Row objects, such as @@ -1378,6 +1388,10 @@ class Row(tuple): >>> Person = Row("name", "age") >>> Person +>>> 'name' in Person +True +>>> 'wrong_key' in Person +False >>> Person("Alice", 11) Row(name='Alice', age=11) """ @@ -1431,6 +1445,12 @@ class Row(tuple): else: return dict(zip(self.__fields__, self)) +def __contains__(self, item): +if hasattr(self, "__fields__"): +return item in self.__fields__ +else: +return super(Row, self).__contains__(item) + # let object acts like class def __call__(self, *args): """create new Row object""" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12200][SQL] Add __contains__ implementation to Row
Repository: spark Updated Branches: refs/heads/branch-2.0 83050ddb8 -> 6e08eb469 [SPARK-12200][SQL] Add __contains__ implementation to Row https://issues.apache.org/jira/browse/SPARK-12200 Author: Maciej BrynskiAuthor: Maciej Bryński Closes #10194 from maver1ck/master. (cherry picked from commit 7ecd496884f6f126ab186b9ceaa861a571d6155c) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e08eb46 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e08eb46 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e08eb46 Branch: refs/heads/branch-2.0 Commit: 6e08eb46951648345e6d21b85a46759422cb3e4b Parents: 83050dd Author: Maciej Brynski Authored: Wed May 11 13:15:11 2016 -0700 Committer: Reynold Xin Committed: Wed May 11 13:15:21 2016 -0700 -- python/pyspark/sql/types.py | 22 +- 1 file changed, 21 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e08eb46/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index f7cd4b8..30ab130 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1359,7 +1359,13 @@ def _create_row(fields, values): class Row(tuple): """ -A row in L{DataFrame}. The fields in it can be accessed like attributes. +A row in L{DataFrame}. +The fields in it can be accessed: + +* like attributes (``row.key``) +* like dictionary values (``row[key]``) + +``key in row`` will search through row keys. Row can be used to create a row object by using named arguments, the fields will be sorted by names. @@ -1371,6 +1377,10 @@ class Row(tuple): ('Alice', 11) >>> row.name, row.age ('Alice', 11) +>>> 'name' in row +True +>>> 'wrong_key' in row +False Row also can be used to create another Row like class, then it could be used to create Row objects, such as @@ -1378,6 +1388,10 @@ class Row(tuple): >>> Person = Row("name", "age") >>> Person +>>> 'name' in Person +True +>>> 'wrong_key' in Person +False >>> Person("Alice", 11) Row(name='Alice', age=11) """ @@ -1431,6 +1445,12 @@ class Row(tuple): else: return dict(zip(self.__fields__, self)) +def __contains__(self, item): +if hasattr(self, "__fields__"): +return item in self.__fields__ +else: +return super(Row, self).__contains__(item) + # let object acts like class def __call__(self, *args): """create new Row object""" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15260] Atomically resize memory pools
Repository: spark Updated Branches: refs/heads/master 81c68eceb -> bb88ad4e0 [SPARK-15260] Atomically resize memory pools ## What changes were proposed in this pull request? When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call `memoryStore.evictBlocksToFreeSpace`, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state. This patch minimizes the things we do between the two calls to make the resizing more atomic. ## How was this patch tested? Jenkins. Author: Andrew OrCloses #13039 from andrewor14/safer-pool. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb88ad4e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb88ad4e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb88ad4e Branch: refs/heads/master Commit: bb88ad4e0e870c88d474c71939a19541522a3023 Parents: 81c68ec Author: Andrew Or Authored: Wed May 11 12:58:57 2016 -0700 Committer: Davies Liu Committed: Wed May 11 12:58:57 2016 -0700 -- .../apache/spark/memory/StorageMemoryPool.scala | 11 +- .../spark/memory/UnifiedMemoryManager.scala | 5 +++-- .../spark/memory/MemoryManagerSuite.scala | 15 + .../memory/UnifiedMemoryManagerSuite.scala | 23 4 files changed, 46 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 0b552ca..4c6b639 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -116,13 +116,13 @@ private[memory] class StorageMemoryPool( } /** - * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number - * of bytes removed from the pool's capacity. + * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. + * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. + * + * @return number of bytes to be removed from the pool's capacity. */ - def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { -// First, shrink the pool by reclaiming free memory: + def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) -decrementPoolSize(spaceFreedByReleasingUnusedMemory) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: @@ -130,7 +130,6 @@ private[memory] class StorageMemoryPool( memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. - decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 82023b5..ae747c1 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -113,9 +113,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: - val spaceReclaimed = storagePool.shrinkPoolToFreeSpace( + val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) - executionPool.incrementPoolSize(spaceReclaimed) + storagePool.decrementPoolSize(spaceToReclaim) +
spark git commit: [SPARK-15260] Atomically resize memory pools
Repository: spark Updated Branches: refs/heads/branch-2.0 6b36185d0 -> 83050ddb8 [SPARK-15260] Atomically resize memory pools ## What changes were proposed in this pull request? When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call `memoryStore.evictBlocksToFreeSpace`, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state. This patch minimizes the things we do between the two calls to make the resizing more atomic. ## How was this patch tested? Jenkins. Author: Andrew OrCloses #13039 from andrewor14/safer-pool. (cherry picked from commit bb88ad4e0e870c88d474c71939a19541522a3023) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83050ddb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83050ddb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83050ddb Branch: refs/heads/branch-2.0 Commit: 83050ddb84ce65b646a88d7d365a960f423eee19 Parents: 6b36185 Author: Andrew Or Authored: Wed May 11 12:58:57 2016 -0700 Committer: Davies Liu Committed: Wed May 11 12:59:05 2016 -0700 -- .../apache/spark/memory/StorageMemoryPool.scala | 11 +- .../spark/memory/UnifiedMemoryManager.scala | 5 +++-- .../spark/memory/MemoryManagerSuite.scala | 15 + .../memory/UnifiedMemoryManagerSuite.scala | 23 4 files changed, 46 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83050ddb/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 0b552ca..4c6b639 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -116,13 +116,13 @@ private[memory] class StorageMemoryPool( } /** - * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number - * of bytes removed from the pool's capacity. + * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. + * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. + * + * @return number of bytes to be removed from the pool's capacity. */ - def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { -// First, shrink the pool by reclaiming free memory: + def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) -decrementPoolSize(spaceFreedByReleasingUnusedMemory) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: @@ -130,7 +130,6 @@ private[memory] class StorageMemoryPool( memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. - decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory http://git-wip-us.apache.org/repos/asf/spark/blob/83050ddb/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 82023b5..ae747c1 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -113,9 +113,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: - val spaceReclaimed = storagePool.shrinkPoolToFreeSpace( + val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) -
spark git commit: [SPARK-15248][SQL] Make MetastoreFileCatalog consider directories from partition specs of a partitioned metastore table
Repository: spark Updated Branches: refs/heads/branch-2.0 56e1e2f17 -> 6b36185d0 [SPARK-15248][SQL] Make MetastoreFileCatalog consider directories from partition specs of a partitioned metastore table Table partitions can be added with locations different from default warehouse location of a hive table. `CREATE TABLE parquetTable (a int) PARTITIONED BY (b int) STORED AS parquet ` `ALTER TABLE parquetTable ADD PARTITION (b=1) LOCATION '/partition'` Querying such a table throws error as the MetastoreFileCatalog does not list the added partition directory, it only lists the default base location. ``` [info] - SPARK-15248: explicitly added partitions should be readable *** FAILED *** (1 second, 8 milliseconds) [info] java.util.NoSuchElementException: key not found: file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition [info] at scala.collection.MapLike$class.default(MapLike.scala:228) [info] at scala.collection.AbstractMap.default(Map.scala:59) [info] at scala.collection.MapLike$class.apply(MapLike.scala:141) [info] at scala.collection.AbstractMap.apply(Map.scala:59) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:59) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:55) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) [info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:104) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog.listFiles(PartitioningAwareFileCatalog.scala:55) [info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:93) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) [info] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:55) [info] at org.apache.spark.sql.execution.SparkStrategies$SpecialLimits$.apply(SparkStrategies.scala:55) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) [info] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60) [info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:77) [info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) [info] at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:82) [info] at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:82) [info] at org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:330) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:146) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:159) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:554) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:535) [info] at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:125) [info] at org.apache.spark.sql.hive.ParquetPartitioningTest.withTempDir(parquetSuites.scala:726) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7.apply$mcV$sp(parquetSuites.scala:535) [info] at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:166) [info] at org.apache.spark.sql.hive.ParquetPartitioningTest.withTable(parquetSuites.scala:726) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply$mcV$sp(parquetSuites.scala:534) [info] at
spark git commit: [SPARK-15248][SQL] Make MetastoreFileCatalog consider directories from partition specs of a partitioned metastore table
Repository: spark Updated Branches: refs/heads/master 89e67d666 -> 81c68eceb [SPARK-15248][SQL] Make MetastoreFileCatalog consider directories from partition specs of a partitioned metastore table Table partitions can be added with locations different from default warehouse location of a hive table. `CREATE TABLE parquetTable (a int) PARTITIONED BY (b int) STORED AS parquet ` `ALTER TABLE parquetTable ADD PARTITION (b=1) LOCATION '/partition'` Querying such a table throws error as the MetastoreFileCatalog does not list the added partition directory, it only lists the default base location. ``` [info] - SPARK-15248: explicitly added partitions should be readable *** FAILED *** (1 second, 8 milliseconds) [info] java.util.NoSuchElementException: key not found: file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition [info] at scala.collection.MapLike$class.default(MapLike.scala:228) [info] at scala.collection.AbstractMap.default(Map.scala:59) [info] at scala.collection.MapLike$class.apply(MapLike.scala:141) [info] at scala.collection.AbstractMap.apply(Map.scala:59) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:59) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:55) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) [info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:104) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog.listFiles(PartitioningAwareFileCatalog.scala:55) [info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:93) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) [info] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:55) [info] at org.apache.spark.sql.execution.SparkStrategies$SpecialLimits$.apply(SparkStrategies.scala:55) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) [info] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60) [info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:77) [info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) [info] at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:82) [info] at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:82) [info] at org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:330) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:146) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:159) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:554) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:535) [info] at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:125) [info] at org.apache.spark.sql.hive.ParquetPartitioningTest.withTempDir(parquetSuites.scala:726) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7.apply$mcV$sp(parquetSuites.scala:535) [info] at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:166) [info] at org.apache.spark.sql.hive.ParquetPartitioningTest.withTable(parquetSuites.scala:726) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply$mcV$sp(parquetSuites.scala:534) [info] at
[3/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala new file mode 100644 index 000..cb782d2 --- /dev/null +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.File +import java.util.Arrays +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.StringDecoder +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.Eventually + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.util.Utils + +class DirectKafkaStreamSuite + extends SparkFunSuite + with BeforeAndAfter + with BeforeAndAfterAll + with Eventually + with Logging { + val sparkConf = new SparkConf() +.setMaster("local[4]") +.setAppName(this.getClass.getSimpleName) + + private var sc: SparkContext = _ + private var ssc: StreamingContext = _ + private var testDir: File = _ + + private var kafkaTestUtils: KafkaTestUtils = _ + + override def beforeAll { +kafkaTestUtils = new KafkaTestUtils +kafkaTestUtils.setup() + } + + override def afterAll { +if (kafkaTestUtils != null) { + kafkaTestUtils.teardown() + kafkaTestUtils = null +} + } + + after { +if (ssc != null) { + ssc.stop() + sc = null +} +if (sc != null) { + sc.stop() +} +if (testDir != null) { + Utils.deleteRecursively(testDir) +} + } + + + test("basic stream receiving with multiple topics and smallest starting offset") { +val topics = Set("basic1", "basic2", "basic3") +val data = Map("a" -> 7, "b" -> 9) +topics.foreach { t => + kafkaTestUtils.createTopic(t) + kafkaTestUtils.sendMessages(t, data) +} +val totalSent = data.values.sum * topics.size +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +ssc = new StreamingContext(sparkConf, Milliseconds(200)) +val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( +ssc, kafkaParams, topics) +} + +val allReceived = new ConcurrentLinkedQueue[(String, String)]() + +// hold a reference to the current offset ranges, so it can be used downstream +var offsetRanges = Array[OffsetRange]() + +stream.transform { rdd => + // Get the offset ranges in the RDD + offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + rdd +}.foreachRDD { rdd => + for (o <- offsetRanges) { +logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") + } + val collected = rdd.mapPartitionsWithIndex { (i, iter) => + // For each partition, get size of the range in the partition, + // and the number of items in the partition +val off = offsetRanges(i) +val all = iter.toSeq +val partSize = all.size +val rangeSize = off.untilOffset - off.fromOffset +Iterator((partSize, rangeSize)) + }.collect + + // Verify whether number of elements
[2/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala deleted file mode 100644 index d4881b1..000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.collection.mutable.ArrayBuffer -import scala.reflect.{classTag, ClassTag} - -import kafka.api.{FetchRequestBuilder, FetchResponse} -import kafka.common.{ErrorMapping, TopicAndPartition} -import kafka.consumer.SimpleConsumer -import kafka.message.{MessageAndMetadata, MessageAndOffset} -import kafka.serializer.Decoder -import kafka.utils.VerifiableProperties - -import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} -import org.apache.spark.internal.Logging -import org.apache.spark.partial.{BoundedDouble, PartialResult} -import org.apache.spark.rdd.RDD -import org.apache.spark.util.NextIterator - -/** - * A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. - * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> - * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD - * @param messageHandler function for translating each message into the desired type - */ -private[kafka] -class KafkaRDD[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, - R: ClassTag] private[spark] ( -sc: SparkContext, -kafkaParams: Map[String, String], -val offsetRanges: Array[OffsetRange], -leaders: Map[TopicAndPartition, (String, Int)], -messageHandler: MessageAndMetadata[K, V] => R - ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { - override def getPartitions: Array[Partition] = { -offsetRanges.zipWithIndex.map { case (o, i) => -val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) -new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) -}.toArray - } - - override def count(): Long = offsetRanges.map(_.count).sum - - override def countApprox( - timeout: Long, - confidence: Double = 0.95 - ): PartialResult[BoundedDouble] = { -val c = count -new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } - - override def isEmpty(): Boolean = count == 0L - - override def take(num: Int): Array[R] = { -val nonEmptyPartitions = this.partitions - .map(_.asInstanceOf[KafkaRDDPartition]) - .filter(_.count > 0) - -if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[R](0) -} - -// Determine in advance how many messages need to be taken from each partition -val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { -val taken = Math.min(remain, part.count) -result + (part.index -> taken.toInt) - } else { -result - } -} - -val buf = new ArrayBuffer[R] -val res = context.runJob( - this, - (tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray, - parts.keys.toArray) -res.foreach(buf ++= _) -buf.toArray - } - - override def getPreferredLocations(thePart: Partition): Seq[String] = { -val part = thePart.asInstanceOf[KafkaRDDPartition] -// TODO is additional hostname resolution necessary here -Seq(part.host) - } - - private def errBeginAfterEnd(part: KafkaRDDPartition): String = -s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " + - s"for topic
[2/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala deleted file mode 100644 index d4881b1..000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.collection.mutable.ArrayBuffer -import scala.reflect.{classTag, ClassTag} - -import kafka.api.{FetchRequestBuilder, FetchResponse} -import kafka.common.{ErrorMapping, TopicAndPartition} -import kafka.consumer.SimpleConsumer -import kafka.message.{MessageAndMetadata, MessageAndOffset} -import kafka.serializer.Decoder -import kafka.utils.VerifiableProperties - -import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} -import org.apache.spark.internal.Logging -import org.apache.spark.partial.{BoundedDouble, PartialResult} -import org.apache.spark.rdd.RDD -import org.apache.spark.util.NextIterator - -/** - * A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. - * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> - * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD - * @param messageHandler function for translating each message into the desired type - */ -private[kafka] -class KafkaRDD[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag, - R: ClassTag] private[spark] ( -sc: SparkContext, -kafkaParams: Map[String, String], -val offsetRanges: Array[OffsetRange], -leaders: Map[TopicAndPartition, (String, Int)], -messageHandler: MessageAndMetadata[K, V] => R - ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { - override def getPartitions: Array[Partition] = { -offsetRanges.zipWithIndex.map { case (o, i) => -val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) -new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) -}.toArray - } - - override def count(): Long = offsetRanges.map(_.count).sum - - override def countApprox( - timeout: Long, - confidence: Double = 0.95 - ): PartialResult[BoundedDouble] = { -val c = count -new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } - - override def isEmpty(): Boolean = count == 0L - - override def take(num: Int): Array[R] = { -val nonEmptyPartitions = this.partitions - .map(_.asInstanceOf[KafkaRDDPartition]) - .filter(_.count > 0) - -if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[R](0) -} - -// Determine in advance how many messages need to be taken from each partition -val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { -val taken = Math.min(remain, part.count) -result + (part.index -> taken.toInt) - } else { -result - } -} - -val buf = new ArrayBuffer[R] -val res = context.runJob( - this, - (tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray, - parts.keys.toArray) -res.foreach(buf ++= _) -buf.toArray - } - - override def getPreferredLocations(thePart: Partition): Seq[String] = { -val part = thePart.asInstanceOf[KafkaRDDPartition] -// TODO is additional hostname resolution necessary here -Seq(part.host) - } - - private def errBeginAfterEnd(part: KafkaRDDPartition): String = -s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " + - s"for topic
[4/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala -- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala new file mode 100644 index 000..edaafb9 --- /dev/null +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.OutputStream +import java.lang.{Integer => JInt, Long => JLong} +import java.nio.charset.StandardCharsets +import java.util.{List => JList, Map => JMap, Set => JSet} + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} +import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler} + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.python.SerDeUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java._ +import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} +import org.apache.spark.streaming.util.WriteAheadLogUtils + +object KafkaUtils { + /** + * Create an input stream that pulls messages from Kafka Brokers. + * @param ssc StreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) + * @param groupId The group id for this consumer + * @param topicsMap of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream( + ssc: StreamingContext, + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 +): ReceiverInputDStream[(String, String)] = { +val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "1") +createStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics, storageLevel) + } + + /** + * Create an input stream that pulls messages from Kafka Brokers. + * @param ssc StreamingContext object + * @param kafkaParams Map of kafka configuration parameters, + *see http://kafka.apache.org/08/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + *in its own thread. + * @param storageLevel Storage level to use for storing the received objects + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam U type of Kafka message key decoder + * @tparam T type of Kafka message value decoder + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( + ssc: StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel +): ReceiverInputDStream[(K, V)] = { +val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) +new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) + } + + /** + * Create an input stream that pulls messages from Kafka Brokers. + * Storage level of the data will be the default
[5/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
[SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact ## What changes were proposed in this pull request? Renaming the streaming-kafka artifact to include kafka version, in anticipation of needing a different artifact for later kafka versions ## How was this patch tested? Unit tests Author: cody koeningerCloses #12946 from koeninger/SPARK-15085. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89e67d66 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89e67d66 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89e67d66 Branch: refs/heads/master Commit: 89e67d6667d5f8be9c6fb6c120fbcd350ae2950d Parents: 6d0368a Author: cody koeninger Authored: Wed May 11 12:15:41 2016 -0700 Committer: Reynold Xin Committed: Wed May 11 12:15:41 2016 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 4 +- dev/audit-release/audit_release.py | 2 +- dev/run-tests.py| 2 +- dev/sparktestsupport/modules.py | 8 +- docs/streaming-kafka-integration.md | 14 +- docs/streaming-programming-guide.md | 4 +- examples/pom.xml| 2 +- external/kafka-0-8-assembly/pom.xml | 176 external/kafka-0-8/pom.xml | 98 +++ .../apache/spark/streaming/kafka/Broker.scala | 66 ++ .../kafka/DirectKafkaInputDStream.scala | 227 ++ .../spark/streaming/kafka/KafkaCluster.scala| 425 ++ .../streaming/kafka/KafkaInputDStream.scala | 142 .../apache/spark/streaming/kafka/KafkaRDD.scala | 269 +++ .../streaming/kafka/KafkaRDDPartition.scala | 42 + .../spark/streaming/kafka/KafkaTestUtils.scala | 275 +++ .../spark/streaming/kafka/KafkaUtils.scala | 805 +++ .../spark/streaming/kafka/OffsetRange.scala | 109 +++ .../streaming/kafka/ReliableKafkaReceiver.scala | 302 +++ .../spark/streaming/kafka/package-info.java | 21 + .../apache/spark/streaming/kafka/package.scala | 23 + .../kafka/JavaDirectKafkaStreamSuite.java | 175 .../streaming/kafka/JavaKafkaRDDSuite.java | 156 .../streaming/kafka/JavaKafkaStreamSuite.java | 135 .../src/test/resources/log4j.properties | 28 + .../kafka/DirectKafkaStreamSuite.scala | 531 .../streaming/kafka/KafkaClusterSuite.scala | 81 ++ .../spark/streaming/kafka/KafkaRDDSuite.scala | 175 .../streaming/kafka/KafkaStreamSuite.scala | 84 ++ .../kafka/ReliableKafkaStreamSuite.scala| 148 external/kafka-assembly/pom.xml | 176 external/kafka/pom.xml | 98 --- .../apache/spark/streaming/kafka/Broker.scala | 66 -- .../kafka/DirectKafkaInputDStream.scala | 227 -- .../spark/streaming/kafka/KafkaCluster.scala| 425 -- .../streaming/kafka/KafkaInputDStream.scala | 142 .../apache/spark/streaming/kafka/KafkaRDD.scala | 269 --- .../streaming/kafka/KafkaRDDPartition.scala | 42 - .../spark/streaming/kafka/KafkaTestUtils.scala | 275 --- .../spark/streaming/kafka/KafkaUtils.scala | 805 --- .../spark/streaming/kafka/OffsetRange.scala | 109 --- .../streaming/kafka/ReliableKafkaReceiver.scala | 302 --- .../spark/streaming/kafka/package-info.java | 21 - .../apache/spark/streaming/kafka/package.scala | 23 - .../kafka/JavaDirectKafkaStreamSuite.java | 175 .../streaming/kafka/JavaKafkaRDDSuite.java | 156 .../streaming/kafka/JavaKafkaStreamSuite.java | 135 .../kafka/src/test/resources/log4j.properties | 28 - .../kafka/DirectKafkaStreamSuite.scala | 531 .../streaming/kafka/KafkaClusterSuite.scala | 81 -- .../spark/streaming/kafka/KafkaRDDSuite.scala | 175 .../streaming/kafka/KafkaStreamSuite.scala | 84 -- .../kafka/ReliableKafkaStreamSuite.scala| 148 pom.xml | 4 +- project/MimaBuild.scala | 10 +- project/SparkBuild.scala| 8 +- python/pyspark/streaming/kafka.py | 6 +- python/pyspark/streaming/tests.py | 6 +- 58 files changed, 4532 insertions(+), 4524 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9075e3e..78606e0 100644 ---
[3/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala new file mode 100644 index 000..cb782d2 --- /dev/null +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.File +import java.util.Arrays +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.StringDecoder +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.Eventually + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.util.Utils + +class DirectKafkaStreamSuite + extends SparkFunSuite + with BeforeAndAfter + with BeforeAndAfterAll + with Eventually + with Logging { + val sparkConf = new SparkConf() +.setMaster("local[4]") +.setAppName(this.getClass.getSimpleName) + + private var sc: SparkContext = _ + private var ssc: StreamingContext = _ + private var testDir: File = _ + + private var kafkaTestUtils: KafkaTestUtils = _ + + override def beforeAll { +kafkaTestUtils = new KafkaTestUtils +kafkaTestUtils.setup() + } + + override def afterAll { +if (kafkaTestUtils != null) { + kafkaTestUtils.teardown() + kafkaTestUtils = null +} + } + + after { +if (ssc != null) { + ssc.stop() + sc = null +} +if (sc != null) { + sc.stop() +} +if (testDir != null) { + Utils.deleteRecursively(testDir) +} + } + + + test("basic stream receiving with multiple topics and smallest starting offset") { +val topics = Set("basic1", "basic2", "basic3") +val data = Map("a" -> 7, "b" -> 9) +topics.foreach { t => + kafkaTestUtils.createTopic(t) + kafkaTestUtils.sendMessages(t, data) +} +val totalSent = data.values.sum * topics.size +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +ssc = new StreamingContext(sparkConf, Milliseconds(200)) +val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( +ssc, kafkaParams, topics) +} + +val allReceived = new ConcurrentLinkedQueue[(String, String)]() + +// hold a reference to the current offset ranges, so it can be used downstream +var offsetRanges = Array[OffsetRange]() + +stream.transform { rdd => + // Get the offset ranges in the RDD + offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + rdd +}.foreachRDD { rdd => + for (o <- offsetRanges) { +logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") + } + val collected = rdd.mapPartitionsWithIndex { (i, iter) => + // For each partition, get size of the range in the partition, + // and the number of items in the partition +val off = offsetRanges(i) +val all = iter.toSeq +val partSize = all.size +val rangeSize = off.untilOffset - off.fromOffset +Iterator((partSize, rangeSize)) + }.collect + + // Verify whether number of elements
[1/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
Repository: spark Updated Branches: refs/heads/branch-2.0 e3703c411 -> 56e1e2f17 http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java -- diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java deleted file mode 100644 index c41b629..000 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import scala.Tuple2; - -import kafka.common.TopicAndPartition; -import kafka.message.MessageAndMetadata; -import kafka.serializer.StringDecoder; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -public class JavaKafkaRDDSuite implements Serializable { - private transient JavaSparkContext sc = null; - private transient KafkaTestUtils kafkaTestUtils = null; - - @Before - public void setUp() { -kafkaTestUtils = new KafkaTestUtils(); -kafkaTestUtils.setup(); -SparkConf sparkConf = new SparkConf() - .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); -sc = new JavaSparkContext(sparkConf); - } - - @After - public void tearDown() { -if (sc != null) { - sc.stop(); - sc = null; -} - -if (kafkaTestUtils != null) { - kafkaTestUtils.teardown(); - kafkaTestUtils = null; -} - } - - @Test - public void testKafkaRDD() throws InterruptedException { -String topic1 = "topic1"; -String topic2 = "topic2"; - -createTopicAndSendData(topic1); -createTopicAndSendData(topic2); - -MapkafkaParams = new HashMap<>(); -kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); - -OffsetRange[] offsetRanges = { - OffsetRange.create(topic1, 0, 0, 1), - OffsetRange.create(topic2, 0, 0, 1) -}; - -Map emptyLeaders = new HashMap<>(); -Map leaders = new HashMap<>(); -String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":"); -Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); -leaders.put(new TopicAndPartition(topic1, 0), broker); -leaders.put(new TopicAndPartition(topic2, 0), broker); - -JavaRDD rdd1 = KafkaUtils.createRDD( -sc, -String.class, -String.class, -StringDecoder.class, -StringDecoder.class, -kafkaParams, -offsetRanges -).map( -new Function , String>() { - @Override - public String call(Tuple2 kv) { -return kv._2(); - } -} -); - -JavaRDD rdd2 = KafkaUtils.createRDD( -sc, -String.class, -String.class, -StringDecoder.class, -StringDecoder.class, -String.class, -kafkaParams, -offsetRanges, -emptyLeaders, -new Function , String>() { - @Override - public String call(MessageAndMetadata msgAndMd) { -return msgAndMd.message(); - } -} -); - -JavaRDD rdd3 = KafkaUtils.createRDD( -sc, -String.class, -String.class, -StringDecoder.class, -StringDecoder.class, -String.class, -kafkaParams, -offsetRanges, -leaders, -new Function , String>() { - @Override - public String call(MessageAndMetadata msgAndMd) { -return msgAndMd.message(); - } -} -); - -
[5/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
[SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact ## What changes were proposed in this pull request? Renaming the streaming-kafka artifact to include kafka version, in anticipation of needing a different artifact for later kafka versions ## How was this patch tested? Unit tests Author: cody koeningerCloses #12946 from koeninger/SPARK-15085. (cherry picked from commit 89e67d6667d5f8be9c6fb6c120fbcd350ae2950d) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56e1e2f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56e1e2f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56e1e2f1 Branch: refs/heads/branch-2.0 Commit: 56e1e2f1706c857f72f519e51c51e39e30638eb6 Parents: e3703c4 Author: cody koeninger Authored: Wed May 11 12:15:41 2016 -0700 Committer: Reynold Xin Committed: Wed May 11 12:15:48 2016 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 4 +- dev/audit-release/audit_release.py | 2 +- dev/run-tests.py| 2 +- dev/sparktestsupport/modules.py | 8 +- docs/streaming-kafka-integration.md | 14 +- docs/streaming-programming-guide.md | 4 +- examples/pom.xml| 2 +- external/kafka-0-8-assembly/pom.xml | 176 external/kafka-0-8/pom.xml | 98 +++ .../apache/spark/streaming/kafka/Broker.scala | 66 ++ .../kafka/DirectKafkaInputDStream.scala | 227 ++ .../spark/streaming/kafka/KafkaCluster.scala| 425 ++ .../streaming/kafka/KafkaInputDStream.scala | 142 .../apache/spark/streaming/kafka/KafkaRDD.scala | 269 +++ .../streaming/kafka/KafkaRDDPartition.scala | 42 + .../spark/streaming/kafka/KafkaTestUtils.scala | 275 +++ .../spark/streaming/kafka/KafkaUtils.scala | 805 +++ .../spark/streaming/kafka/OffsetRange.scala | 109 +++ .../streaming/kafka/ReliableKafkaReceiver.scala | 302 +++ .../spark/streaming/kafka/package-info.java | 21 + .../apache/spark/streaming/kafka/package.scala | 23 + .../kafka/JavaDirectKafkaStreamSuite.java | 175 .../streaming/kafka/JavaKafkaRDDSuite.java | 156 .../streaming/kafka/JavaKafkaStreamSuite.java | 135 .../src/test/resources/log4j.properties | 28 + .../kafka/DirectKafkaStreamSuite.scala | 531 .../streaming/kafka/KafkaClusterSuite.scala | 81 ++ .../spark/streaming/kafka/KafkaRDDSuite.scala | 175 .../streaming/kafka/KafkaStreamSuite.scala | 84 ++ .../kafka/ReliableKafkaStreamSuite.scala| 148 external/kafka-assembly/pom.xml | 176 external/kafka/pom.xml | 98 --- .../apache/spark/streaming/kafka/Broker.scala | 66 -- .../kafka/DirectKafkaInputDStream.scala | 227 -- .../spark/streaming/kafka/KafkaCluster.scala| 425 -- .../streaming/kafka/KafkaInputDStream.scala | 142 .../apache/spark/streaming/kafka/KafkaRDD.scala | 269 --- .../streaming/kafka/KafkaRDDPartition.scala | 42 - .../spark/streaming/kafka/KafkaTestUtils.scala | 275 --- .../spark/streaming/kafka/KafkaUtils.scala | 805 --- .../spark/streaming/kafka/OffsetRange.scala | 109 --- .../streaming/kafka/ReliableKafkaReceiver.scala | 302 --- .../spark/streaming/kafka/package-info.java | 21 - .../apache/spark/streaming/kafka/package.scala | 23 - .../kafka/JavaDirectKafkaStreamSuite.java | 175 .../streaming/kafka/JavaKafkaRDDSuite.java | 156 .../streaming/kafka/JavaKafkaStreamSuite.java | 135 .../kafka/src/test/resources/log4j.properties | 28 - .../kafka/DirectKafkaStreamSuite.scala | 531 .../streaming/kafka/KafkaClusterSuite.scala | 81 -- .../spark/streaming/kafka/KafkaRDDSuite.scala | 175 .../streaming/kafka/KafkaStreamSuite.scala | 84 -- .../kafka/ReliableKafkaStreamSuite.scala| 148 pom.xml | 4 +- project/MimaBuild.scala | 10 +- project/SparkBuild.scala| 8 +- python/pyspark/streaming/kafka.py | 6 +- python/pyspark/streaming/tests.py | 6 +- 58 files changed, 4532 insertions(+), 4524 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
[1/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
Repository: spark Updated Branches: refs/heads/master 6d0368ab8 -> 89e67d666 http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java -- diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java deleted file mode 100644 index c41b629..000 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import scala.Tuple2; - -import kafka.common.TopicAndPartition; -import kafka.message.MessageAndMetadata; -import kafka.serializer.StringDecoder; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -public class JavaKafkaRDDSuite implements Serializable { - private transient JavaSparkContext sc = null; - private transient KafkaTestUtils kafkaTestUtils = null; - - @Before - public void setUp() { -kafkaTestUtils = new KafkaTestUtils(); -kafkaTestUtils.setup(); -SparkConf sparkConf = new SparkConf() - .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); -sc = new JavaSparkContext(sparkConf); - } - - @After - public void tearDown() { -if (sc != null) { - sc.stop(); - sc = null; -} - -if (kafkaTestUtils != null) { - kafkaTestUtils.teardown(); - kafkaTestUtils = null; -} - } - - @Test - public void testKafkaRDD() throws InterruptedException { -String topic1 = "topic1"; -String topic2 = "topic2"; - -createTopicAndSendData(topic1); -createTopicAndSendData(topic2); - -MapkafkaParams = new HashMap<>(); -kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); - -OffsetRange[] offsetRanges = { - OffsetRange.create(topic1, 0, 0, 1), - OffsetRange.create(topic2, 0, 0, 1) -}; - -Map emptyLeaders = new HashMap<>(); -Map leaders = new HashMap<>(); -String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":"); -Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1])); -leaders.put(new TopicAndPartition(topic1, 0), broker); -leaders.put(new TopicAndPartition(topic2, 0), broker); - -JavaRDD rdd1 = KafkaUtils.createRDD( -sc, -String.class, -String.class, -StringDecoder.class, -StringDecoder.class, -kafkaParams, -offsetRanges -).map( -new Function , String>() { - @Override - public String call(Tuple2 kv) { -return kv._2(); - } -} -); - -JavaRDD rdd2 = KafkaUtils.createRDD( -sc, -String.class, -String.class, -StringDecoder.class, -StringDecoder.class, -String.class, -kafkaParams, -offsetRanges, -emptyLeaders, -new Function , String>() { - @Override - public String call(MessageAndMetadata msgAndMd) { -return msgAndMd.message(); - } -} -); - -JavaRDD rdd3 = KafkaUtils.createRDD( -sc, -String.class, -String.class, -StringDecoder.class, -StringDecoder.class, -String.class, -kafkaParams, -offsetRanges, -leaders, -new Function , String>() { - @Override - public String call(MessageAndMetadata msgAndMd) { -return msgAndMd.message(); - } -} -); - -//
[4/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala -- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala new file mode 100644 index 000..edaafb9 --- /dev/null +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.OutputStream +import java.lang.{Integer => JInt, Long => JLong} +import java.nio.charset.StandardCharsets +import java.util.{List => JList, Map => JMap, Set => JSet} + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata +import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} +import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler} + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.python.SerDeUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java._ +import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} +import org.apache.spark.streaming.util.WriteAheadLogUtils + +object KafkaUtils { + /** + * Create an input stream that pulls messages from Kafka Brokers. + * @param ssc StreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) + * @param groupId The group id for this consumer + * @param topicsMap of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream( + ssc: StreamingContext, + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 +): ReceiverInputDStream[(String, String)] = { +val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "1") +createStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics, storageLevel) + } + + /** + * Create an input stream that pulls messages from Kafka Brokers. + * @param ssc StreamingContext object + * @param kafkaParams Map of kafka configuration parameters, + *see http://kafka.apache.org/08/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + *in its own thread. + * @param storageLevel Storage level to use for storing the received objects + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + * @tparam U type of Kafka message key decoder + * @tparam T type of Kafka message value decoder + * @return DStream of (Kafka message key, Kafka message value) + */ + def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( + ssc: StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel +): ReceiverInputDStream[(K, V)] = { +val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) +new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) + } + + /** + * Create an input stream that pulls messages from Kafka Brokers. + * Storage level of the data will be the default
spark git commit: [SPARK-15037] [SQL] [MLLIB] Part2: Use SparkSession instead of SQLContext in Python TestSuites
Repository: spark Updated Branches: refs/heads/master d8935db5e -> 293143797 [SPARK-15037] [SQL] [MLLIB] Part2: Use SparkSession instead of SQLContext in Python TestSuites ## What changes were proposed in this pull request? Use SparkSession instead of SQLContext in Python TestSuites ## How was this patch tested? Existing tests Author: Sandeep SinghCloses #13044 from techaddict/SPARK-15037-python. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29314379 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29314379 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29314379 Branch: refs/heads/master Commit: 29314379729de4082bd2297c9e5289e3e4a0115e Parents: d8935db Author: Sandeep Singh Authored: Wed May 11 11:24:16 2016 -0700 Committer: Davies Liu Committed: Wed May 11 11:24:16 2016 -0700 -- python/pyspark/ml/tests.py | 97 - python/pyspark/mllib/tests.py| 19 +- python/pyspark/sql/readwriter.py | 72 +++ python/pyspark/sql/tests.py | 379 -- 4 files changed, 273 insertions(+), 294 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29314379/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index ad1631f..49d3a4a 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -57,13 +57,25 @@ from pyspark.ml.tuning import * from pyspark.ml.wrapper import JavaParams from pyspark.mllib.common import _java2py from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector -from pyspark.sql import DataFrame, SQLContext, Row +from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.functions import rand from pyspark.sql.utils import IllegalArgumentException from pyspark.storagelevel import * from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase +class SparkSessionTestCase(PySparkTestCase): +@classmethod +def setUpClass(cls): +PySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +PySparkTestCase.tearDownClass() +cls.spark.stop() + + class MockDataset(DataFrame): def __init__(self): @@ -350,7 +362,7 @@ class ParamTests(PySparkTestCase): self.assertEqual(model.getWindowSize(), 6) -class FeatureTests(PySparkTestCase): +class FeatureTests(SparkSessionTestCase): def test_binarizer(self): b0 = Binarizer() @@ -376,8 +388,7 @@ class FeatureTests(PySparkTestCase): self.assertEqual(b1.getOutputCol(), "output") def test_idf(self): -sqlContext = SQLContext(self.sc) -dataset = sqlContext.createDataFrame([ +dataset = self.spark.createDataFrame([ (DenseVector([1.0, 2.0]),), (DenseVector([0.0, 1.0]),), (DenseVector([3.0, 0.2]),)], ["tf"]) @@ -390,8 +401,7 @@ class FeatureTests(PySparkTestCase): self.assertIsNotNone(output.head().idf) def test_ngram(self): -sqlContext = SQLContext(self.sc) -dataset = sqlContext.createDataFrame([ +dataset = self.spark.createDataFrame([ Row(input=["a", "b", "c", "d", "e"])]) ngram0 = NGram(n=4, inputCol="input", outputCol="output") self.assertEqual(ngram0.getN(), 4) @@ -401,8 +411,7 @@ class FeatureTests(PySparkTestCase): self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"]) def test_stopwordsremover(self): -sqlContext = SQLContext(self.sc) -dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])]) +dataset = self.spark.createDataFrame([Row(input=["a", "panda"])]) stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output") # Default self.assertEqual(stopWordRemover.getInputCol(), "input") @@ -419,15 +428,14 @@ class FeatureTests(PySparkTestCase): self.assertEqual(transformedDF.head().output, ["a"]) # with language selection stopwords = StopWordsRemover.loadDefaultStopWords("turkish") -dataset = sqlContext.createDataFrame([Row(input=["acaba", "ama", "biri"])]) +dataset = self.spark.createDataFrame([Row(input=["acaba", "ama", "biri"])]) stopWordRemover.setStopWords(stopwords) self.assertEqual(stopWordRemover.getStopWords(), stopwords) transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, []) def test_count_vectorizer_with_binary(self): -sqlContext = SQLContext(self.sc) -dataset = sqlContext.createDataFrame([ +dataset =
[3/3] spark git commit: [SPARK-13522][CORE] Fix the exit log place for heartbeat
[SPARK-13522][CORE] Fix the exit log place for heartbeat ## What changes were proposed in this pull request? Just fixed the log place introduced by #11401 ## How was this patch tested? unit tests. Author: Shixiong ZhuCloses #11432 from zsxwing/SPARK-13522-follow-up. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ced71d35 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ced71d35 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ced71d35 Branch: refs/heads/branch-1.6 Commit: ced71d353a0908abcf5b83503661bef97ae0953d Parents: 86bf93e Author: Shixiong Zhu Authored: Mon Feb 29 11:52:11 2016 -0800 Committer: Andrew Or Committed: Wed May 11 11:29:10 2016 -0700 -- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ced71d35/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b8a1668..a3ebaff 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -481,9 +481,10 @@ private[spark] class Executor( } catch { case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e) -logError(s"Unable to send heartbeats to driver more than $HEARTBEAT_MAX_FAILURES times") heartbeatFailures += 1 if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) { + logError(s"Exit as unable to send heartbeats to driver " + +s"more than $HEARTBEAT_MAX_FAILURES times") System.exit(ExecutorExitCode.HEARTBEAT_FAILURE) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/3] spark git commit: [SPARK-13519][CORE] Driver should tell Executor to stop itself when cleaning executor's state
Repository: spark Updated Branches: refs/heads/branch-1.6 d1654864a -> ced71d353 [SPARK-13519][CORE] Driver should tell Executor to stop itself when cleaning executor's state ## What changes were proposed in this pull request? When the driver removes an executor's state, the connection between the driver and the executor may be still alive so that the executor cannot exit automatically (E.g., Master will send RemoveExecutor when a work is lost but the executor is still alive), so the driver should try to tell the executor to stop itself. Otherwise, we will leak an executor. This PR modified the driver to send `StopExecutor` to the executor when it's removed. ## How was this patch tested? manual test: increase the worker heartbeat interval to force it's always timeout and the leak executors are gone. Author: Shixiong ZhuCloses #11399 from zsxwing/SPARK-13519. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c433c0af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c433c0af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c433c0af Branch: refs/heads/branch-1.6 Commit: c433c0afd4c3f96ef24686a1f28262af81b67723 Parents: d165486 Author: Shixiong Zhu Authored: Fri Feb 26 15:11:57 2016 -0800 Committer: Andrew Or Committed: Wed May 11 11:29:01 2016 -0700 -- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 1 file changed, 4 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c433c0af/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 505c161..7189685 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -179,6 +179,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case RemoveExecutor(executorId, reason) => +// We will remove the executor's state and cannot restore it. However, the connection +// between the driver and the executor may be still alive so that the executor won't exit +// automatically, so try to tell the executor to stop itself. See SPARK-13519. + executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) context.reply(true) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/3] spark git commit: [SPARK-13522][CORE] Executor should kill itself when it's unable to heartbeat to driver more than N times
[SPARK-13522][CORE] Executor should kill itself when it's unable to heartbeat to driver more than N times ## What changes were proposed in this pull request? Sometimes, network disconnection event won't be triggered for other potential race conditions that we may not have thought of, then the executor will keep sending heartbeats to driver and won't exit. This PR adds a new configuration `spark.executor.heartbeat.maxFailures` to kill Executor when it's unable to heartbeat to the driver more than `spark.executor.heartbeat.maxFailures` times. ## How was this patch tested? unit tests Author: Shixiong ZhuCloses #11401 from zsxwing/SPARK-13522. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86bf93e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86bf93e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86bf93e6 Branch: refs/heads/branch-1.6 Commit: 86bf93e65481b8fe5d7532ca6d4cd29cafc9e9dd Parents: c433c0a Author: Shixiong Zhu Authored: Mon Feb 29 11:02:45 2016 -0800 Committer: Andrew Or Committed: Wed May 11 11:29:06 2016 -0700 -- .../org/apache/spark/executor/Executor.scala| 22 +++- .../spark/executor/ExecutorExitCode.scala | 8 +++ 2 files changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/86bf93e6/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b248e12..b8a1668 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -114,6 +114,19 @@ private[spark] class Executor( private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv) + /** + * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` + * times, it should kill itself. The default value is 60. It means we will retry to send + * heartbeats about 10 minutes because the heartbeat interval is 10s. + */ + private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60) + + /** + * Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each + * successful heartbeat will reset it to 0. + */ + private var heartbeatFailures = 0 + startDriverHeartbeater() def launchTask( @@ -464,8 +477,15 @@ private[spark] class Executor( logInfo("Told to re-register on heartbeat") env.blockManager.reregister() } + heartbeatFailures = 0 } catch { - case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e) + case NonFatal(e) => +logWarning("Issue communicating with driver in heartbeater", e) +logError(s"Unable to send heartbeats to driver more than $HEARTBEAT_MAX_FAILURES times") +heartbeatFailures += 1 +if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) { + System.exit(ExecutorExitCode.HEARTBEAT_FAILURE) +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/86bf93e6/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala index ea36fb6..99858f7 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala @@ -39,6 +39,12 @@ object ExecutorExitCode { /** ExternalBlockStore failed to create a local temporary directory after many attempts. */ val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55 + /** + * Executor is unable to send heartbeats to the driver more than + * "spark.executor.heartbeat.maxFailures" times. + */ + val HEARTBEAT_FAILURE = 56 + def explainExitCode(exitCode: Int): String = { exitCode match { case UNCAUGHT_EXCEPTION => "Uncaught exception" @@ -51,6 +57,8 @@ object ExecutorExitCode { // TODO: replace external block store with concrete implementation name case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR => "ExternalBlockStore failed to create a local temporary directory." + case HEARTBEAT_FAILURE => +"Unable to send heartbeats to driver." case _ => "Unknown executor exit code (" + exitCode + ")" + ( if (exitCode > 128) {
spark git commit: [SPARK-15259] Sort time metric should not include spill and record insertion time
Repository: spark Updated Branches: refs/heads/branch-2.0 1b90adc03 -> e3703c411 [SPARK-15259] Sort time metric should not include spill and record insertion time ## What changes were proposed in this pull request? After SPARK-14669 it seems the sort time metric includes both spill and record insertion time. This makes it not very useful since the metric becomes close to the total execution time of the node. We should track just the time spent for in-memory sort, as before. ## How was this patch tested? Verified metric in the UI, also unit test on UnsafeExternalRowSorter. cc davies Author: Eric LiangAuthor: Eric Liang Closes #13035 from ericl/fix-metrics. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3703c41 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3703c41 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3703c41 Branch: refs/heads/branch-2.0 Commit: e3703c41177c01c41516d1669c1ffa239074e59f Parents: 1b90adc Author: Eric Liang Authored: Wed May 11 11:25:46 2016 -0700 Committer: Davies Liu Committed: Wed May 11 11:26:49 2016 -0700 -- .../unsafe/sort/UnsafeExternalSorter.java | 13 + .../unsafe/sort/UnsafeInMemorySorter.java | 11 +++ .../unsafe/sort/UnsafeExternalSorterSuite.java | 20 .../sql/execution/UnsafeExternalRowSorter.java | 7 +++ .../apache/spark/sql/execution/SortExec.scala | 9 ++--- 5 files changed, 53 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e3703c41/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 8b6c96a..7dc0508 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private long pageCursor = -1; private long peakMemoryUsedBytes = 0; private long totalSpillBytes = 0L; + private long totalSortTimeNanos = 0L; private volatile SpillableIterator readingIterator = null; public static UnsafeExternalSorter createWithExistingInMemorySorter( @@ -248,6 +249,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } /** + * @return the total amount of time spent sorting data (in-memory only). + */ + public long getSortTimeNanos() { +UnsafeInMemorySorter sorter = inMemSorter; +if (sorter != null) { + return sorter.getSortTimeNanos(); +} +return totalSortTimeNanos; + } + + /** * Return the total number of bytes that has been spilled into disk so far. */ public long getSpillSize() { @@ -505,6 +517,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // in-memory sorter will not be used after spilling assert(inMemSorter != null); released += inMemSorter.getMemoryUsage(); +totalSortTimeNanos += inMemSorter.getSortTimeNanos(); inMemSorter.free(); inMemSorter = null; taskContext.taskMetrics().incMemoryBytesSpilled(released); http://git-wip-us.apache.org/repos/asf/spark/blob/e3703c41/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 03973f3..0cce792 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -97,6 +97,8 @@ public final class UnsafeInMemorySorter { private long initialSize; + private long totalSortTimeNanos = 0L; + public UnsafeInMemorySorter( final MemoryConsumer consumer, final TaskMemoryManager memoryManager, @@ -160,6 +162,13 @@ public final class UnsafeInMemorySorter { return pos / 2; } + /** + * @return the total amount of time spent sorting data (in-memory only). + */ + public long getSortTimeNanos() { +return totalSortTimeNanos; + } + public long getMemoryUsage() { return array.size() * 8; } @@ -265,6 +274,7 @@ public final class
spark git commit: [SPARK-15037] [SQL] [MLLIB] Part2: Use SparkSession instead of SQLContext in Python TestSuites
Repository: spark Updated Branches: refs/heads/branch-2.0 381a82589 -> 1b90adc03 [SPARK-15037] [SQL] [MLLIB] Part2: Use SparkSession instead of SQLContext in Python TestSuites ## What changes were proposed in this pull request? Use SparkSession instead of SQLContext in Python TestSuites ## How was this patch tested? Existing tests Author: Sandeep SinghCloses #13044 from techaddict/SPARK-15037-python. (cherry picked from commit 29314379729de4082bd2297c9e5289e3e4a0115e) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b90adc0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b90adc0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b90adc0 Branch: refs/heads/branch-2.0 Commit: 1b90adc03bd88fbd664d4692092142672ac5a36b Parents: 381a825 Author: Sandeep Singh Authored: Wed May 11 11:24:16 2016 -0700 Committer: Davies Liu Committed: Wed May 11 11:24:28 2016 -0700 -- python/pyspark/ml/tests.py | 97 - python/pyspark/mllib/tests.py| 19 +- python/pyspark/sql/readwriter.py | 72 +++ python/pyspark/sql/tests.py | 379 -- 4 files changed, 273 insertions(+), 294 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b90adc0/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index ad1631f..49d3a4a 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -57,13 +57,25 @@ from pyspark.ml.tuning import * from pyspark.ml.wrapper import JavaParams from pyspark.mllib.common import _java2py from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector -from pyspark.sql import DataFrame, SQLContext, Row +from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.functions import rand from pyspark.sql.utils import IllegalArgumentException from pyspark.storagelevel import * from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase +class SparkSessionTestCase(PySparkTestCase): +@classmethod +def setUpClass(cls): +PySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +PySparkTestCase.tearDownClass() +cls.spark.stop() + + class MockDataset(DataFrame): def __init__(self): @@ -350,7 +362,7 @@ class ParamTests(PySparkTestCase): self.assertEqual(model.getWindowSize(), 6) -class FeatureTests(PySparkTestCase): +class FeatureTests(SparkSessionTestCase): def test_binarizer(self): b0 = Binarizer() @@ -376,8 +388,7 @@ class FeatureTests(PySparkTestCase): self.assertEqual(b1.getOutputCol(), "output") def test_idf(self): -sqlContext = SQLContext(self.sc) -dataset = sqlContext.createDataFrame([ +dataset = self.spark.createDataFrame([ (DenseVector([1.0, 2.0]),), (DenseVector([0.0, 1.0]),), (DenseVector([3.0, 0.2]),)], ["tf"]) @@ -390,8 +401,7 @@ class FeatureTests(PySparkTestCase): self.assertIsNotNone(output.head().idf) def test_ngram(self): -sqlContext = SQLContext(self.sc) -dataset = sqlContext.createDataFrame([ +dataset = self.spark.createDataFrame([ Row(input=["a", "b", "c", "d", "e"])]) ngram0 = NGram(n=4, inputCol="input", outputCol="output") self.assertEqual(ngram0.getN(), 4) @@ -401,8 +411,7 @@ class FeatureTests(PySparkTestCase): self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"]) def test_stopwordsremover(self): -sqlContext = SQLContext(self.sc) -dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])]) +dataset = self.spark.createDataFrame([Row(input=["a", "panda"])]) stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output") # Default self.assertEqual(stopWordRemover.getInputCol(), "input") @@ -419,15 +428,14 @@ class FeatureTests(PySparkTestCase): self.assertEqual(transformedDF.head().output, ["a"]) # with language selection stopwords = StopWordsRemover.loadDefaultStopWords("turkish") -dataset = sqlContext.createDataFrame([Row(input=["acaba", "ama", "biri"])]) +dataset = self.spark.createDataFrame([Row(input=["acaba", "ama", "biri"])]) stopWordRemover.setStopWords(stopwords) self.assertEqual(stopWordRemover.getStopWords(), stopwords) transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, []) def
spark git commit: [SPARK-15241] [SPARK-15242] [SQL] fix 2 decimal-related issues in RowEncoder
Repository: spark Updated Branches: refs/heads/branch-2.0 403ba6513 -> 381a82589 [SPARK-15241] [SPARK-15242] [SQL] fix 2 decimal-related issues in RowEncoder ## What changes were proposed in this pull request? SPARK-15241: We now support java decimal and catalyst decimal in external row, it makes sense to also support scala decimal. SPARK-15242: This is a long-standing bug, and is exposed after https://github.com/apache/spark/pull/12364, which eliminate the `If` expression if the field is not nullable: ``` val fieldValue = serializerFor( GetExternalRowField(inputObject, i, externalDataTypeForInput(f.dataType)), f.dataType) if (f.nullable) { If( Invoke(inputObject, "isNullAt", BooleanType, Literal(i) :: Nil), Literal.create(null, f.dataType), fieldValue) } else { fieldValue } ``` Previously, we always use `DecimalType.SYSTEM_DEFAULT` as the output type of converted decimal field, which is wrong as it doesn't match the real decimal type. However, it works well because we always put converted field into `If` expression to do the null check, and `If` use its `trueValue`'s data type as its output type. Now if we have a not nullable decimal field, then the converted field's output type will be `DecimalType.SYSTEM_DEFAULT`, and we will write wrong data into unsafe row. The fix is simple, just use the given decimal type as the output type of converted decimal field. These 2 issues was found at https://github.com/apache/spark/pull/13008 ## How was this patch tested? new tests in RowEncoderSuite Author: Wenchen FanCloses #13019 from cloud-fan/encoder-decimal. (cherry picked from commit d8935db5ecb7c959585411da9bf1e9a9c4d5cb37) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/381a8258 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/381a8258 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/381a8258 Branch: refs/heads/branch-2.0 Commit: 381a825890b74afa0bd7325265aa4e15bbc8f10f Parents: 403ba65 Author: Wenchen Fan Authored: Wed May 11 11:16:05 2016 -0700 Committer: Davies Liu Committed: Wed May 11 11:16:12 2016 -0700 -- .../sql/catalyst/encoders/RowEncoder.scala | 6 ++-- .../org/apache/spark/sql/types/Decimal.scala| 1 + .../encoders/ExpressionEncoderSuite.scala | 3 +- .../sql/catalyst/encoders/RowEncoderSuite.scala | 29 4 files changed, 29 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/381a8258/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index cfde3bf..33ac1fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -84,10 +84,10 @@ object RowEncoder { "fromJavaDate", inputObject :: Nil) -case _: DecimalType => +case d: DecimalType => StaticInvoke( Decimal.getClass, -DecimalType.SYSTEM_DEFAULT, +d, "fromDecimal", inputObject :: Nil) @@ -162,7 +162,7 @@ object RowEncoder { * `org.apache.spark.sql.types.Decimal`. */ private def externalDataTypeForInput(dt: DataType): DataType = dt match { -// In order to support both Decimal and java BigDecimal in external row, we make this +// In order to support both Decimal and java/scala BigDecimal in external row, we make this // as java.lang.Object. case _: DecimalType => ObjectType(classOf[java.lang.Object]) case _ => externalDataTypeFor(dt) http://git-wip-us.apache.org/repos/asf/spark/blob/381a8258/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 6f4ec6b..2f7422b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -386,6 +386,7 @@ object Decimal { def fromDecimal(value: Any): Decimal = { value match { case j: java.math.BigDecimal => apply(j) + case d: BigDecimal => apply(d) case d: Decimal => d } }
spark git commit: [SPARK-15241] [SPARK-15242] [SQL] fix 2 decimal-related issues in RowEncoder
Repository: spark Updated Branches: refs/heads/master e1576478b -> d8935db5e [SPARK-15241] [SPARK-15242] [SQL] fix 2 decimal-related issues in RowEncoder ## What changes were proposed in this pull request? SPARK-15241: We now support java decimal and catalyst decimal in external row, it makes sense to also support scala decimal. SPARK-15242: This is a long-standing bug, and is exposed after https://github.com/apache/spark/pull/12364, which eliminate the `If` expression if the field is not nullable: ``` val fieldValue = serializerFor( GetExternalRowField(inputObject, i, externalDataTypeForInput(f.dataType)), f.dataType) if (f.nullable) { If( Invoke(inputObject, "isNullAt", BooleanType, Literal(i) :: Nil), Literal.create(null, f.dataType), fieldValue) } else { fieldValue } ``` Previously, we always use `DecimalType.SYSTEM_DEFAULT` as the output type of converted decimal field, which is wrong as it doesn't match the real decimal type. However, it works well because we always put converted field into `If` expression to do the null check, and `If` use its `trueValue`'s data type as its output type. Now if we have a not nullable decimal field, then the converted field's output type will be `DecimalType.SYSTEM_DEFAULT`, and we will write wrong data into unsafe row. The fix is simple, just use the given decimal type as the output type of converted decimal field. These 2 issues was found at https://github.com/apache/spark/pull/13008 ## How was this patch tested? new tests in RowEncoderSuite Author: Wenchen FanCloses #13019 from cloud-fan/encoder-decimal. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8935db5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8935db5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8935db5 Branch: refs/heads/master Commit: d8935db5ecb7c959585411da9bf1e9a9c4d5cb37 Parents: e157647 Author: Wenchen Fan Authored: Wed May 11 11:16:05 2016 -0700 Committer: Davies Liu Committed: Wed May 11 11:16:05 2016 -0700 -- .../sql/catalyst/encoders/RowEncoder.scala | 6 ++-- .../org/apache/spark/sql/types/Decimal.scala| 1 + .../encoders/ExpressionEncoderSuite.scala | 3 +- .../sql/catalyst/encoders/RowEncoderSuite.scala | 29 4 files changed, 29 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8935db5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index cfde3bf..33ac1fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -84,10 +84,10 @@ object RowEncoder { "fromJavaDate", inputObject :: Nil) -case _: DecimalType => +case d: DecimalType => StaticInvoke( Decimal.getClass, -DecimalType.SYSTEM_DEFAULT, +d, "fromDecimal", inputObject :: Nil) @@ -162,7 +162,7 @@ object RowEncoder { * `org.apache.spark.sql.types.Decimal`. */ private def externalDataTypeForInput(dt: DataType): DataType = dt match { -// In order to support both Decimal and java BigDecimal in external row, we make this +// In order to support both Decimal and java/scala BigDecimal in external row, we make this // as java.lang.Object. case _: DecimalType => ObjectType(classOf[java.lang.Object]) case _ => externalDataTypeFor(dt) http://git-wip-us.apache.org/repos/asf/spark/blob/d8935db5/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 6f4ec6b..2f7422b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -386,6 +386,7 @@ object Decimal { def fromDecimal(value: Any): Decimal = { value match { case j: java.math.BigDecimal => apply(j) + case d: BigDecimal => apply(d) case d: Decimal => d } } http://git-wip-us.apache.org/repos/asf/spark/blob/d8935db5/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
spark git commit: [SPARK-14933][HOTFIX] Replace `sqlContext` with `spark`.
Repository: spark Updated Branches: refs/heads/master a5f9fdbba -> e1576478b [SPARK-14933][HOTFIX] Replace `sqlContext` with `spark`. ## What changes were proposed in this pull request? This fixes compile errors. ## How was this patch tested? Pass the Jenkins tests. Author: Dongjoon HyunCloses #13053 from dongjoon-hyun/hotfix_sqlquerysuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1576478 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1576478 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1576478 Branch: refs/heads/master Commit: e1576478bde2b9146bdbd4f2bae20a4011b20229 Parents: a5f9fdb Author: Dongjoon Hyun Authored: Wed May 11 10:03:51 2016 -0700 Committer: Yin Huai Committed: Wed May 11 10:03:51 2016 -0700 -- .../java/org/apache/spark/examples/ml/JavaLDAExample.java| 2 +- .../org/apache/spark/sql/hive/execution/SQLViewSuite.scala | 8 2 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1576478/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java index 7102ddd..9041244 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java @@ -53,7 +53,7 @@ public class JavaLDAExample { double lp = model.logPerplexity(dataset); System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll); System.out.println("The upper bound bound on perplexity: " + lp); - + // Describe topics. Dataset topics = model.describeTopics(3); System.out.println("The topics described by their top-weighted terms:"); http://git-wip-us.apache.org/repos/asf/spark/blob/e1576478/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 5c72ec5..42dbe18 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -308,11 +308,11 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-14933 - create view from hive parquet tabale") { withTable("t_part") { withView("v_part") { -sqlContext.sql( +spark.sql( """create table t_part (c1 int, c2 int) |stored as parquet as select 1 as a, 2 as b """.stripMargin) -sqlContext.sql("create view v_part as select * from t_part") +spark.sql("create view v_part as select * from t_part") checkAnswer( sql("select * from t_part"), sql("select * from v_part")) @@ -323,11 +323,11 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-14933 - create view from hive orc tabale") { withTable("t_orc") { withView("v_orc") { -sqlContext.sql( +spark.sql( """create table t_orc (c1 int, c2 int) |stored as orc as select 1 as a, 2 as b """.stripMargin) -sqlContext.sql("create view v_orc as select * from t_orc") +spark.sql("create view v_orc as select * from t_orc") checkAnswer( sql("select * from t_orc"), sql("select * from v_orc")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14933][HOTFIX] Replace `sqlContext` with `spark`.
Repository: spark Updated Branches: refs/heads/branch-2.0 0858a82c1 -> 403ba6513 [SPARK-14933][HOTFIX] Replace `sqlContext` with `spark`. ## What changes were proposed in this pull request? This fixes compile errors. ## How was this patch tested? Pass the Jenkins tests. Author: Dongjoon HyunCloses #13053 from dongjoon-hyun/hotfix_sqlquerysuite. (cherry picked from commit e1576478bde2b9146bdbd4f2bae20a4011b20229) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/403ba651 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/403ba651 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/403ba651 Branch: refs/heads/branch-2.0 Commit: 403ba65133c3bb45ad6e236464218d8970b893c9 Parents: 0858a82 Author: Dongjoon Hyun Authored: Wed May 11 10:03:51 2016 -0700 Committer: Yin Huai Committed: Wed May 11 10:04:00 2016 -0700 -- .../java/org/apache/spark/examples/ml/JavaLDAExample.java| 2 +- .../org/apache/spark/sql/hive/execution/SQLViewSuite.scala | 8 2 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/403ba651/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java index 7102ddd..9041244 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java @@ -53,7 +53,7 @@ public class JavaLDAExample { double lp = model.logPerplexity(dataset); System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll); System.out.println("The upper bound bound on perplexity: " + lp); - + // Describe topics. Dataset topics = model.describeTopics(3); System.out.println("The topics described by their top-weighted terms:"); http://git-wip-us.apache.org/repos/asf/spark/blob/403ba651/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 5c72ec5..42dbe18 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -308,11 +308,11 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-14933 - create view from hive parquet tabale") { withTable("t_part") { withView("v_part") { -sqlContext.sql( +spark.sql( """create table t_part (c1 int, c2 int) |stored as parquet as select 1 as a, 2 as b """.stripMargin) -sqlContext.sql("create view v_part as select * from t_part") +spark.sql("create view v_part as select * from t_part") checkAnswer( sql("select * from t_part"), sql("select * from v_part")) @@ -323,11 +323,11 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-14933 - create view from hive orc tabale") { withTable("t_orc") { withView("v_orc") { -sqlContext.sql( +spark.sql( """create table t_orc (c1 int, c2 int) |stored as orc as select 1 as a, 2 as b """.stripMargin) -sqlContext.sql("create view v_orc as select * from t_orc") +spark.sql("create view v_orc as select * from t_orc") checkAnswer( sql("select * from t_orc"), sql("select * from v_orc")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15268][SQL] Make JavaTypeInference work with UDTRegistration
Repository: spark Updated Branches: refs/heads/branch-2.0 749c29bc0 -> 0858a82c1 [SPARK-15268][SQL] Make JavaTypeInference work with UDTRegistration ## What changes were proposed in this pull request? We have a private `UDTRegistration` API to register user defined type. Currently `JavaTypeInference` can't work with it. So `SparkSession.createDataFrame` from a bean class will not correctly infer the schema of the bean class. ## How was this patch tested? `VectorUDTSuite`. Author: Liang-Chi HsiehCloses #13046 from viirya/fix-udt-registry-javatypeinference. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0858a82c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0858a82c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0858a82c Branch: refs/heads/branch-2.0 Commit: 0858a82c141fe9b2d2c94a62c16657dcd6c3ec8b Parents: 749c29b Author: Liang-Chi Hsieh Authored: Wed May 11 09:31:22 2016 -0700 Committer: Xiangrui Meng Committed: Wed May 11 09:33:55 2016 -0700 -- .../org/apache/spark/ml/linalg/VectorUDTSuite.scala | 16 .../spark/sql/catalyst/JavaTypeInference.scala | 5 + 2 files changed, 21 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0858a82c/mllib/src/test/scala/org/apache/spark/ml/linalg/VectorUDTSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/linalg/VectorUDTSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/linalg/VectorUDTSuite.scala index 6d01d8f..7b50876 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/linalg/VectorUDTSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/linalg/VectorUDTSuite.scala @@ -17,9 +17,19 @@ package org.apache.spark.ml.linalg +import scala.beans.BeanInfo + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.JavaTypeInference import org.apache.spark.sql.types._ +@BeanInfo +case class LabeledPoint(label: Double, features: Vector) { + override def toString: String = { +s"($label,$features)" + } +} + class VectorUDTSuite extends SparkFunSuite { test("preloaded VectorUDT") { @@ -36,4 +46,10 @@ class VectorUDTSuite extends SparkFunSuite { assert(udt.simpleString == "vector") } } + + test("JavaTypeInference with VectorUDT") { +val (dataType, _) = JavaTypeInference.inferDataType(classOf[LabeledPoint]) +assert(dataType.asInstanceOf[StructType].fields.map(_.dataType) + === Seq(new VectorUDT, DoubleType)) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/0858a82c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 6f9fbbb..92caf8f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -63,6 +63,11 @@ object JavaTypeInference { case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) => (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true) + case c: Class[_] if UDTRegistration.exists(c.getName) => +val udt = UDTRegistration.getUDTFor(c.getName).get.newInstance() + .asInstanceOf[UserDefinedType[_ >: Null]] +(udt, true) + case c: Class[_] if c == classOf[java.lang.String] => (StringType, true) case c: Class[_] if c == classOf[Array[Byte]] => (BinaryType, true) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14933][SQL] Failed to create view out of a parquet or orc table
Repository: spark Updated Branches: refs/heads/branch-2.0 3bd7a89bd -> 749c29bc0 [SPARK-14933][SQL] Failed to create view out of a parquet or orc table ## What changes were proposed in this pull request? Symptom If a table is created as parquet or ORC table with hive syntaxt DDL, such as ```SQL create table t1 (c1 int, c2 string) stored as parquet ``` The following command will fail ```SQL create view v1 as select * from t1 ``` Root Cause Currently, `HiveMetaStoreCatalog` converts Paruqet/Orc tables to `LogicalRelation` without giving any `tableIdentifier`. `SQLBuilder` expects the `LogicalRelation` to have an associated `tableIdentifier`. However, the `LogicalRelation` created earlier does not have such a `tableIdentifier`. Thus, `SQLBuilder.toSQL` can not recognize this logical plan and issue an exception. This PR is to assign a `TableIdentifier` to the `LogicalRelation` when resolving parquet or orc tables in `HiveMetaStoreCatalog`. ## How was this patch tested? testcases created and dev/run-tests is run. Author: xin WuCloses #12716 from xwu0226/SPARK_14933. (cherry picked from commit 427c20dd6d84cb9de1aac322183bc6e7b72ca25d) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/749c29bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/749c29bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/749c29bc Branch: refs/heads/branch-2.0 Commit: 749c29bc099c20aa6156b843cf7c9216315cd5a6 Parents: 3bd7a89 Author: xin Wu Authored: Wed May 11 22:17:59 2016 +0800 Committer: Cheng Lian Committed: Wed May 11 22:21:40 2016 +0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 10 +-- .../sql/catalyst/LogicalPlanToSQLSuite.scala| 24 .../spark/sql/hive/execution/SQLViewSuite.scala | 30 3 files changed, 62 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/749c29bc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 7a799b6..607f0a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -293,7 +293,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileFormat = defaultSource, options = options) -val created = LogicalRelation(relation) +val created = LogicalRelation( + relation, + metastoreTableIdentifier = +Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database cachedDataSourceTables.put(tableIdentifier, created) created } @@ -317,7 +320,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log userSpecifiedSchema = Some(metastoreRelation.schema), bucketSpec = bucketSpec, options = options, - className = fileType).resolveRelation()) + className = fileType).resolveRelation(), + metastoreTableIdentifier = +Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database + cachedDataSourceTables.put(tableIdentifier, created) created http://git-wip-us.apache.org/repos/asf/spark/blob/749c29bc/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala index 9abefa5..4315197 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala @@ -741,4 +741,28 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { test("filter after subquery") { checkHiveQl("SELECT a FROM (SELECT key + 1 AS a FROM parquet_t1) t WHERE a > 5") } + + test("SPARK-14933 - select parquet table") { +withTable("parquet_t") { + sql( +""" + |create table parquet_t (c1 int, c2 string) + |stored as parquet select 1, 'abc' +""".stripMargin) + + checkHiveQl("select * from parquet_t") +} + } + + test("SPARK-14933 - select orc table") { +
spark git commit: [SPARK-14933][SQL] Failed to create view out of a parquet or orc table
Repository: spark Updated Branches: refs/heads/master d88afabdf -> 427c20dd6 [SPARK-14933][SQL] Failed to create view out of a parquet or orc table ## What changes were proposed in this pull request? Symptom If a table is created as parquet or ORC table with hive syntaxt DDL, such as ```SQL create table t1 (c1 int, c2 string) stored as parquet ``` The following command will fail ```SQL create view v1 as select * from t1 ``` Root Cause Currently, `HiveMetaStoreCatalog` converts Paruqet/Orc tables to `LogicalRelation` without giving any `tableIdentifier`. `SQLBuilder` expects the `LogicalRelation` to have an associated `tableIdentifier`. However, the `LogicalRelation` created earlier does not have such a `tableIdentifier`. Thus, `SQLBuilder.toSQL` can not recognize this logical plan and issue an exception. This PR is to assign a `TableIdentifier` to the `LogicalRelation` when resolving parquet or orc tables in `HiveMetaStoreCatalog`. ## How was this patch tested? testcases created and dev/run-tests is run. Author: xin WuCloses #12716 from xwu0226/SPARK_14933. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/427c20dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/427c20dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/427c20dd Branch: refs/heads/master Commit: 427c20dd6d84cb9de1aac322183bc6e7b72ca25d Parents: d88afab Author: xin Wu Authored: Wed May 11 22:17:59 2016 +0800 Committer: Cheng Lian Committed: Wed May 11 22:17:59 2016 +0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 10 +-- .../sql/catalyst/LogicalPlanToSQLSuite.scala| 24 .../spark/sql/hive/execution/SQLViewSuite.scala | 30 3 files changed, 62 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/427c20dd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 7a799b6..607f0a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -293,7 +293,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileFormat = defaultSource, options = options) -val created = LogicalRelation(relation) +val created = LogicalRelation( + relation, + metastoreTableIdentifier = +Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database cachedDataSourceTables.put(tableIdentifier, created) created } @@ -317,7 +320,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log userSpecifiedSchema = Some(metastoreRelation.schema), bucketSpec = bucketSpec, options = options, - className = fileType).resolveRelation()) + className = fileType).resolveRelation(), + metastoreTableIdentifier = +Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database + cachedDataSourceTables.put(tableIdentifier, created) created http://git-wip-us.apache.org/repos/asf/spark/blob/427c20dd/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala index 9abefa5..4315197 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala @@ -741,4 +741,28 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { test("filter after subquery") { checkHiveQl("SELECT a FROM (SELECT key + 1 AS a FROM parquet_t1) t WHERE a > 5") } + + test("SPARK-14933 - select parquet table") { +withTable("parquet_t") { + sql( +""" + |create table parquet_t (c1 int, c2 string) + |stored as parquet select 1, 'abc' +""".stripMargin) + + checkHiveQl("select * from parquet_t") +} + } + + test("SPARK-14933 - select orc table") { +withTable("orc_t") { + sql( +""" + |create table orc_t (c1 int, c2 string) + |stored as orc select 1, 'abc' +
spark git commit: [SPARK-15150][EXAMPLE][DOC] Update LDA examples
Repository: spark Updated Branches: refs/heads/master fafc95af7 -> d88afabdf [SPARK-15150][EXAMPLE][DOC] Update LDA examples ## What changes were proposed in this pull request? 1,create a libsvm-type dataset for lda: `data/mllib/sample_lda_libsvm_data.txt` 2,add python example 3,directly read the datafile in examples 4,BTW, change to `SparkSession` in `aft_survival_regression.py` ## How was this patch tested? manual tests `./bin/spark-submit examples/src/main/python/ml/lda_example.py` Author: Zheng RuiFengCloses #12927 from zhengruifeng/lda_pe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d88afabd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d88afabd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d88afabd Branch: refs/heads/master Commit: d88afabdfa83be47f36d833105aadd6b818c Parents: fafc95a Author: Zheng RuiFeng Authored: Wed May 11 12:49:41 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 12:49:41 2016 +0200 -- data/mllib/sample_lda_libsvm_data.txt | 12 docs/ml-clustering.md | 7 +- .../ml/JavaAFTSurvivalRegressionExample.java| 7 ++ .../spark/examples/ml/JavaLDAExample.java | 67 ++-- .../main/python/ml/aft_survival_regression.py | 19 -- examples/src/main/python/ml/lda_example.py | 64 +++ .../ml/AFTSurvivalRegressionExample.scala | 6 +- .../apache/spark/examples/ml/LDAExample.scala | 41 +--- 8 files changed, 143 insertions(+), 80 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d88afabd/data/mllib/sample_lda_libsvm_data.txt -- diff --git a/data/mllib/sample_lda_libsvm_data.txt b/data/mllib/sample_lda_libsvm_data.txt new file mode 100644 index 000..bf118d7 --- /dev/null +++ b/data/mllib/sample_lda_libsvm_data.txt @@ -0,0 +1,12 @@ +0 1:1 2:2 3:6 4:0 5:2 6:3 7:1 8:1 9:0 10:0 11:3 +1 1:1 2:3 3:0 4:1 5:3 6:0 7:0 8:2 9:0 10:0 11:1 +2 1:1 2:4 3:1 4:0 5:0 6:4 7:9 8:0 9:1 10:2 11:0 +3 1:2 2:1 3:0 4:3 5:0 6:0 7:5 8:0 9:2 10:3 11:9 +4 1:3 2:1 3:1 4:9 5:3 6:0 7:2 8:0 9:0 10:1 11:3 +5 1:4 2:2 3:0 4:3 5:4 6:5 7:1 8:1 9:1 10:4 11:0 +6 1:2 2:1 3:0 4:3 5:0 6:0 7:5 8:0 9:2 10:2 11:9 +7 1:1 2:1 3:1 4:9 5:2 6:1 7:2 8:0 9:0 10:1 11:3 +8 1:4 2:4 3:0 4:3 5:4 6:2 7:1 8:3 9:0 10:0 11:0 +9 1:2 2:8 3:2 4:0 5:3 6:0 7:2 8:0 9:2 10:7 11:2 +10 1:1 2:1 3:1 4:9 5:0 6:2 7:2 8:0 9:0 10:3 11:3 +11 1:4 2:1 3:0 4:0 5:4 6:5 7:1 8:3 9:0 10:1 11:0 http://git-wip-us.apache.org/repos/asf/spark/blob/d88afabd/docs/ml-clustering.md -- diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 876a280..0d69bf6 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -109,8 +109,13 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/LDA.html) f {% include_example java/org/apache/spark/examples/ml/JavaLDAExample.java %} - + + +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering.LDA) for more details. +{% include_example python/ml/lda_example.py %} + + ## Bisecting k-means http://git-wip-us.apache.org/repos/asf/spark/blob/d88afabd/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java index 2c2aa6d..b011575 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java @@ -31,6 +31,13 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.*; // $example off$ +/** + * An example demonstrating AFTSurvivalRegression. + * Run with + * + * bin/run-example ml.JavaAFTSurvivalRegressionExample + * + */ public class JavaAFTSurvivalRegressionExample { public static void main(String[] args) { SparkSession spark = SparkSession http://git-wip-us.apache.org/repos/asf/spark/blob/d88afabd/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java index 1c52f37..7102ddd 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java +++
spark git commit: [SPARK-15150][EXAMPLE][DOC] Update LDA examples
Repository: spark Updated Branches: refs/heads/branch-2.0 1753f6502 -> 3bd7a89bd [SPARK-15150][EXAMPLE][DOC] Update LDA examples ## What changes were proposed in this pull request? 1,create a libsvm-type dataset for lda: `data/mllib/sample_lda_libsvm_data.txt` 2,add python example 3,directly read the datafile in examples 4,BTW, change to `SparkSession` in `aft_survival_regression.py` ## How was this patch tested? manual tests `./bin/spark-submit examples/src/main/python/ml/lda_example.py` Author: Zheng RuiFengCloses #12927 from zhengruifeng/lda_pe. (cherry picked from commit d88afabdfa83be47f36d833105aadd6b818c) Signed-off-by: Nick Pentreath Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bd7a89b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bd7a89b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bd7a89b Branch: refs/heads/branch-2.0 Commit: 3bd7a89bd08bfcaf94039d3070ba60bdf2c9a2e4 Parents: 1753f65 Author: Zheng RuiFeng Authored: Wed May 11 12:49:41 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 12:50:09 2016 +0200 -- data/mllib/sample_lda_libsvm_data.txt | 12 docs/ml-clustering.md | 7 +- .../ml/JavaAFTSurvivalRegressionExample.java| 7 ++ .../spark/examples/ml/JavaLDAExample.java | 67 ++-- .../main/python/ml/aft_survival_regression.py | 19 -- examples/src/main/python/ml/lda_example.py | 64 +++ .../ml/AFTSurvivalRegressionExample.scala | 6 +- .../apache/spark/examples/ml/LDAExample.scala | 41 +--- 8 files changed, 143 insertions(+), 80 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3bd7a89b/data/mllib/sample_lda_libsvm_data.txt -- diff --git a/data/mllib/sample_lda_libsvm_data.txt b/data/mllib/sample_lda_libsvm_data.txt new file mode 100644 index 000..bf118d7 --- /dev/null +++ b/data/mllib/sample_lda_libsvm_data.txt @@ -0,0 +1,12 @@ +0 1:1 2:2 3:6 4:0 5:2 6:3 7:1 8:1 9:0 10:0 11:3 +1 1:1 2:3 3:0 4:1 5:3 6:0 7:0 8:2 9:0 10:0 11:1 +2 1:1 2:4 3:1 4:0 5:0 6:4 7:9 8:0 9:1 10:2 11:0 +3 1:2 2:1 3:0 4:3 5:0 6:0 7:5 8:0 9:2 10:3 11:9 +4 1:3 2:1 3:1 4:9 5:3 6:0 7:2 8:0 9:0 10:1 11:3 +5 1:4 2:2 3:0 4:3 5:4 6:5 7:1 8:1 9:1 10:4 11:0 +6 1:2 2:1 3:0 4:3 5:0 6:0 7:5 8:0 9:2 10:2 11:9 +7 1:1 2:1 3:1 4:9 5:2 6:1 7:2 8:0 9:0 10:1 11:3 +8 1:4 2:4 3:0 4:3 5:4 6:2 7:1 8:3 9:0 10:0 11:0 +9 1:2 2:8 3:2 4:0 5:3 6:0 7:2 8:0 9:2 10:7 11:2 +10 1:1 2:1 3:1 4:9 5:0 6:2 7:2 8:0 9:0 10:3 11:3 +11 1:4 2:1 3:0 4:0 5:4 6:5 7:1 8:3 9:0 10:1 11:0 http://git-wip-us.apache.org/repos/asf/spark/blob/3bd7a89b/docs/ml-clustering.md -- diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 876a280..0d69bf6 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -109,8 +109,13 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/LDA.html) f {% include_example java/org/apache/spark/examples/ml/JavaLDAExample.java %} - + + +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering.LDA) for more details. +{% include_example python/ml/lda_example.py %} + + ## Bisecting k-means http://git-wip-us.apache.org/repos/asf/spark/blob/3bd7a89b/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java index 2c2aa6d..b011575 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java @@ -31,6 +31,13 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.*; // $example off$ +/** + * An example demonstrating AFTSurvivalRegression. + * Run with + * + * bin/run-example ml.JavaAFTSurvivalRegressionExample + * + */ public class JavaAFTSurvivalRegressionExample { public static void main(String[] args) { SparkSession spark = SparkSession http://git-wip-us.apache.org/repos/asf/spark/blob/3bd7a89b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java index
spark git commit: [SPARK-15238] Clarify supported Python versions
Repository: spark Updated Branches: refs/heads/branch-2.0 1e7d8ba5d -> 1753f6502 [SPARK-15238] Clarify supported Python versions This PR: * Clarifies that Spark *does* support Python 3, starting with Python 3.4. Author: Nicholas ChammasCloses #13017 from nchammas/supported-python-versions. (cherry picked from commit fafc95af79fa34f82964a86407c2ee046eda3814) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1753f650 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1753f650 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1753f650 Branch: refs/heads/branch-2.0 Commit: 1753f6502ce7c81f43e120b0dc324674b4d5331c Parents: 1e7d8ba Author: Nicholas Chammas Authored: Wed May 11 11:00:12 2016 +0100 Committer: Sean Owen Committed: Wed May 11 11:00:20 2016 +0100 -- docs/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1753f650/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index 20eab56..7157afc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -24,8 +24,8 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy locally on one machine --- all you need is to have `java` installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. -Spark runs on Java 7+, Python 2.6+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} uses -Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version +Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} +uses Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version ({{site.SCALA_BINARY_VERSION}}.x). # Running the Examples and Shell - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15238] Clarify supported Python versions
Repository: spark Updated Branches: refs/heads/master 33597810e -> fafc95af7 [SPARK-15238] Clarify supported Python versions This PR: * Clarifies that Spark *does* support Python 3, starting with Python 3.4. Author: Nicholas ChammasCloses #13017 from nchammas/supported-python-versions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fafc95af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fafc95af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fafc95af Branch: refs/heads/master Commit: fafc95af79fa34f82964a86407c2ee046eda3814 Parents: 3359781 Author: Nicholas Chammas Authored: Wed May 11 11:00:12 2016 +0100 Committer: Sean Owen Committed: Wed May 11 11:00:12 2016 +0100 -- docs/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fafc95af/docs/index.md -- diff --git a/docs/index.md b/docs/index.md index 20eab56..7157afc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -24,8 +24,8 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy locally on one machine --- all you need is to have `java` installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. -Spark runs on Java 7+, Python 2.6+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} uses -Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version +Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} +uses Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version ({{site.SCALA_BINARY_VERSION}}.x). # Running the Examples and Shell - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard
Repository: spark Updated Branches: refs/heads/master 8beae5914 -> 33597810e [SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard ## What changes were proposed in this pull request? make StreamingContext.textFileStream support wildcard like /home/user/*/file ## How was this patch tested? I did manual test and added a new unit test case Author: mwwsAuthor: unknown Closes #12752 from mwws/SPARK_FileStream. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33597810 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33597810 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33597810 Branch: refs/heads/master Commit: 33597810ec256cd9bd363bad9239cc6d5b707a6f Parents: 8beae59 Author: mwws Authored: Wed May 11 10:46:58 2016 +0100 Committer: Sean Owen Committed: Wed May 11 10:46:58 2016 +0100 -- .../streaming/dstream/FileInputDStream.scala| 10 +++- .../spark/streaming/InputStreamsSuite.scala | 62 2 files changed, 70 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/33597810/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 36f50e0..ed93058 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -195,10 +195,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ) logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val filter = new PathFilter { + + val newFileFilter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) + val directoryFilter = new PathFilter { +override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory + } + val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath) + val newFiles = directories.flatMap(dir => +fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) http://git-wip-us.apache.org/repos/asf/spark/blob/33597810/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 6b4c15f..00d506c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testFileStream(newFilesOnly = false) } + test("file input stream - wildcard") { +var testDir: File = null +try { + val batchDuration = Seconds(2) + testDir = Utils.createTempDir() + val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1") + val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2") + + // Create a file that exists before the StreamingContext is created: + val existingFile = new File(testDir, "0") + Files.write("0\n", existingFile, StandardCharsets.UTF_8) + assert(existingFile.setLastModified(1) && existingFile.lastModified === 1) + + val pathWithWildCard = testDir.toString + "/*/" + + // Set up the streaming context and input streams + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => +val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] +clock.setTime(existingFile.lastModified + batchDuration.milliseconds) +val batchCounter = new BatchCounter(ssc) +// monitor "testDir/*/" +val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( + pathWithWildCard).map(_._2.toString) +val outputQueue = new ConcurrentLinkedQueue[Seq[String]] +val outputStream = new TestOutputStream(fileStream, outputQueue) +
spark git commit: [SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard
Repository: spark Updated Branches: refs/heads/branch-2.0 36f711dc6 -> 1e7d8ba5d [SPARK-14976][STREAMING] make StreamingContext.textFileStream support wildcard ## What changes were proposed in this pull request? make StreamingContext.textFileStream support wildcard like /home/user/*/file ## How was this patch tested? I did manual test and added a new unit test case Author: mwwsAuthor: unknown Closes #12752 from mwws/SPARK_FileStream. (cherry picked from commit 33597810ec256cd9bd363bad9239cc6d5b707a6f) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e7d8ba5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e7d8ba5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e7d8ba5 Branch: refs/heads/branch-2.0 Commit: 1e7d8ba5d6212c6e1e57a48f56d68c03c7386e66 Parents: 36f711d Author: mwws Authored: Wed May 11 10:46:58 2016 +0100 Committer: Sean Owen Committed: Wed May 11 10:47:36 2016 +0100 -- .../streaming/dstream/FileInputDStream.scala| 10 +++- .../spark/streaming/InputStreamsSuite.scala | 62 2 files changed, 70 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1e7d8ba5/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 36f50e0..ed93058 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -195,10 +195,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ) logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val filter = new PathFilter { + + val newFileFilter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) + val directoryFilter = new PathFilter { +override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory + } + val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath) + val newFiles = directories.flatMap(dir => +fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) http://git-wip-us.apache.org/repos/asf/spark/blob/1e7d8ba5/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 6b4c15f..00d506c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -198,6 +198,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testFileStream(newFilesOnly = false) } + test("file input stream - wildcard") { +var testDir: File = null +try { + val batchDuration = Seconds(2) + testDir = Utils.createTempDir() + val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1") + val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2") + + // Create a file that exists before the StreamingContext is created: + val existingFile = new File(testDir, "0") + Files.write("0\n", existingFile, StandardCharsets.UTF_8) + assert(existingFile.setLastModified(1) && existingFile.lastModified === 1) + + val pathWithWildCard = testDir.toString + "/*/" + + // Set up the streaming context and input streams + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => +val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] +clock.setTime(existingFile.lastModified + batchDuration.milliseconds) +val batchCounter = new BatchCounter(ssc) +// monitor "testDir/*/" +val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( + pathWithWildCard).map(_._2.toString) +val outputQueue = new
spark git commit: [SPARK-14495][SQL][1.6] fix resolution failure of having clause with distinct aggregate function
Repository: spark Updated Branches: refs/heads/branch-1.6 1678bff7f -> d1654864a [SPARK-14495][SQL][1.6] fix resolution failure of having clause with distinct aggregate function Symptom: In the latest **branch 1.6**, when a `DISTINCT` aggregation function is used in the `HAVING` clause, Analyzer throws `AnalysisException` with a message like following: ``` resolved attribute(s) gid#558,id#559 missing from date#554,id#555 in operator !Expand [List(date#554, null, 0, if ((gid#558 = 1)) id#559 else null),List(date#554, id#555, 1, null)], [date#554,id#561,gid#560,if ((gid = 1)) id else null#562]; ``` Root cause: The problem is that the distinct aggregate in having condition are resolved by the rule `DistinctAggregationRewriter` twice, which messes up the resulted `EXPAND` operator. In a `ResolveAggregateFunctions` rule, when resolving ```Filter(havingCondition, _: Aggregate)```, the `havingCondition` is resolved as an `Aggregate` in a nested loop of analyzer rule execution (by invoking `RuleExecutor.execute`). At this nested level of analysis, the rule `DistinctAggregationRewriter` rewrites this distinct aggregate clause to an expanded two-layer aggregation, where the `aggregateExpresssions` of the final `Aggregate` contains the resolved `gid` and the aggregate expression attributes (In the above case, they are `gid#558, id#559`). After completion of the nested analyzer rule execution, the resulted `aggregateExpressions` in the `havingCondition` is pushed down into the underlying `Aggregate` operator. The `DistinctAggregationRewriter` rule is executed again. The `projections` field of `EXPAND` operator is populated with the `aggregateExpressions` of the `havingCondition` mentioned above. However, the attributes (In the above case, they are `gid#558, id#559`) in the projection list of `EXPAND` operator can not be found in the underlying relation. Solution: This PR retrofits part of [#11579](https://github.com/apache/spark/pull/11579) that moves the `DistinctAggregationRewriter` to the beginning of Optimizer, so that it guarantees that the rewrite only happens after all the aggregate functions are resolved first. Thus, it avoids resolution failure. How is the PR change tested New [test cases ](https://github.com/xwu0226/spark/blob/f73428f94746d6d074baf6702589545bdbd11cad/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala#L927-L988) are added to drive `DistinctAggregationRewriter` rewrites for multi-distinct aggregations , involving having clause. A following up PR will be submitted to add these test cases to master(2.0) branch. Author: xin WuCloses #12974 from xwu0226/SPARK-14495_review. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1654864 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1654864 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1654864 Branch: refs/heads/branch-1.6 Commit: d1654864a60503a5e495a1261f55ceb89f916984 Parents: 1678bff Author: xin Wu Authored: Wed May 11 16:30:45 2016 +0800 Committer: Wenchen Fan Committed: Wed May 11 16:30:45 2016 +0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 1 - .../analysis/DistinctAggregationRewriter.scala | 5 + .../sql/catalyst/optimizer/Optimizer.scala | 20 .../expressions/ExpressionEvalHelper.scala | 4 ++-- .../expressions/MathFunctionsSuite.scala| 4 ++-- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 8 +--- .../hive/execution/AggregationQuerySuite.scala | 16 8 files changed, 43 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d1654864/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bc62c7f..04f62d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -80,7 +80,6 @@ class Analyzer( ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: - DistinctAggregationRewriter(conf) :: HiveTypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once,
spark git commit: [SPARK-15149][EXAMPLE][DOC] update kmeans example
Repository: spark Updated Branches: refs/heads/branch-2.0 73dd88939 -> 36f711dc6 [SPARK-15149][EXAMPLE][DOC] update kmeans example ## What changes were proposed in this pull request? Python example for ml.kmeans already exists, but not included in user guide. 1,small changes like: `example_on` `example_off` 2,add it to user guide 3,update examples to directly read datafile ## How was this patch tested? manual tests `./bin/spark-submit examples/src/main/python/ml/kmeans_example.py Author: Zheng RuiFengCloses #12925 from zhengruifeng/km_pe. (cherry picked from commit 8beae59144827d81491eed385dc2aa6aedd6a7b4) Signed-off-by: Nick Pentreath Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36f711dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36f711dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36f711dc Branch: refs/heads/branch-2.0 Commit: 36f711dc6d577f585fed424da5c49f04c5226591 Parents: 73dd889 Author: Zheng RuiFeng Authored: Wed May 11 10:01:43 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 10:02:09 2016 +0200 -- docs/ml-clustering.md | 5 ++ .../spark/examples/ml/JavaKMeansExample.java| 60 +--- examples/src/main/python/ml/kmeans_example.py | 46 ++- .../spark/examples/ml/KMeansExample.scala | 33 +-- 4 files changed, 50 insertions(+), 94 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36f711dc/docs/ml-clustering.md -- diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 1245b8b..876a280 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -79,6 +79,11 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/KMeans.html {% include_example java/org/apache/spark/examples/ml/JavaKMeansExample.java %} + +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering.KMeans) for more details. + +{% include_example python/ml/kmeans_example.py %} + http://git-wip-us.apache.org/repos/asf/spark/blob/36f711dc/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java index 65e29ad..2489a9b 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java @@ -17,77 +17,45 @@ package org.apache.spark.examples.ml; -import java.util.regex.Pattern; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.expressions.GenericRow; // $example on$ import org.apache.spark.ml.clustering.KMeansModel; import org.apache.spark.ml.clustering.KMeans; import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.VectorUDT; -import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; // $example off$ +import org.apache.spark.sql.SparkSession; /** - * An example demonstrating a k-means clustering. + * An example demonstrating k-means clustering. * Run with * - * bin/run-example ml.JavaKMeansExample + * bin/run-example ml.JavaKMeansExample * */ public class JavaKMeansExample { - private static class ParsePoint implements Function { -private static final Pattern separator = Pattern.compile(" "); - -@Override -public Row call(String line) { - String[] tok = separator.split(line); - double[] point = new double[tok.length]; - for (int i = 0; i < tok.length; ++i) { -point[i] = Double.parseDouble(tok[i]); - } - Vector[] points = {Vectors.dense(point)}; - return new GenericRow(points); -} - } - public static void main(String[] args) { -if (args.length != 2) { - System.err.println("Usage: ml.JavaKMeansExample "); - System.exit(1); -} -String inputFile = args[0]; -int k = Integer.parseInt(args[1]); - -// Parses the arguments +// Create a SparkSession. SparkSession spark = SparkSession .builder() .appName("JavaKMeansExample") .getOrCreate(); // $example on$ -// Loads data -JavaRDD points
spark git commit: [SPARK-15149][EXAMPLE][DOC] update kmeans example
Repository: spark Updated Branches: refs/heads/master cef73b563 -> 8beae5914 [SPARK-15149][EXAMPLE][DOC] update kmeans example ## What changes were proposed in this pull request? Python example for ml.kmeans already exists, but not included in user guide. 1,small changes like: `example_on` `example_off` 2,add it to user guide 3,update examples to directly read datafile ## How was this patch tested? manual tests `./bin/spark-submit examples/src/main/python/ml/kmeans_example.py Author: Zheng RuiFengCloses #12925 from zhengruifeng/km_pe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8beae591 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8beae591 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8beae591 Branch: refs/heads/master Commit: 8beae59144827d81491eed385dc2aa6aedd6a7b4 Parents: cef73b5 Author: Zheng RuiFeng Authored: Wed May 11 10:01:43 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 10:01:43 2016 +0200 -- docs/ml-clustering.md | 5 ++ .../spark/examples/ml/JavaKMeansExample.java| 60 +--- examples/src/main/python/ml/kmeans_example.py | 46 ++- .../spark/examples/ml/KMeansExample.scala | 33 +-- 4 files changed, 50 insertions(+), 94 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8beae591/docs/ml-clustering.md -- diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 1245b8b..876a280 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -79,6 +79,11 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/KMeans.html {% include_example java/org/apache/spark/examples/ml/JavaKMeansExample.java %} + +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering.KMeans) for more details. + +{% include_example python/ml/kmeans_example.py %} + http://git-wip-us.apache.org/repos/asf/spark/blob/8beae591/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java index 65e29ad..2489a9b 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java @@ -17,77 +17,45 @@ package org.apache.spark.examples.ml; -import java.util.regex.Pattern; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.expressions.GenericRow; // $example on$ import org.apache.spark.ml.clustering.KMeansModel; import org.apache.spark.ml.clustering.KMeans; import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.VectorUDT; -import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; // $example off$ +import org.apache.spark.sql.SparkSession; /** - * An example demonstrating a k-means clustering. + * An example demonstrating k-means clustering. * Run with * - * bin/run-example ml.JavaKMeansExample + * bin/run-example ml.JavaKMeansExample * */ public class JavaKMeansExample { - private static class ParsePoint implements Function { -private static final Pattern separator = Pattern.compile(" "); - -@Override -public Row call(String line) { - String[] tok = separator.split(line); - double[] point = new double[tok.length]; - for (int i = 0; i < tok.length; ++i) { -point[i] = Double.parseDouble(tok[i]); - } - Vector[] points = {Vectors.dense(point)}; - return new GenericRow(points); -} - } - public static void main(String[] args) { -if (args.length != 2) { - System.err.println("Usage: ml.JavaKMeansExample "); - System.exit(1); -} -String inputFile = args[0]; -int k = Integer.parseInt(args[1]); - -// Parses the arguments +// Create a SparkSession. SparkSession spark = SparkSession .builder() .appName("JavaKMeansExample") .getOrCreate(); // $example on$ -// Loads data -JavaRDD points = spark.read().text(inputFile).javaRDD().map(new ParsePoint()); -StructField[] fields = {new StructField("features", new
spark git commit: [SPARK-14340][EXAMPLE][DOC] Update Examples and User Guide for ml.BisectingKMeans
Repository: spark Updated Branches: refs/heads/branch-2.0 bee2ddb39 -> 73dd88939 [SPARK-14340][EXAMPLE][DOC] Update Examples and User Guide for ml.BisectingKMeans ## What changes were proposed in this pull request? 1, add BisectingKMeans to ml-clustering.md 2, add the missing Scala BisectingKMeansExample 3, create a new datafile `data/mllib/sample_kmeans_data.txt` ## How was this patch tested? manual tests Author: Zheng RuiFengCloses #11844 from zhengruifeng/doc_bkm. (cherry picked from commit cef73b563864d5f8aa1b26e31e3b9af6f0a08a5d) Signed-off-by: Nick Pentreath Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73dd8893 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73dd8893 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73dd8893 Branch: refs/heads/branch-2.0 Commit: 73dd8893922b387827843fe08e28da758e854242 Parents: bee2ddb Author: Zheng RuiFeng Authored: Wed May 11 09:56:36 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 09:56:59 2016 +0200 -- data/mllib/sample_kmeans_data.txt | 6 ++ docs/ml-clustering.md | 37 ++- .../examples/ml/JavaBisectingKMeansExample.java | 49 ++- .../main/python/ml/bisecting_k_means_example.py | 30 + .../examples/ml/BisectingKMeansExample.scala| 65 5 files changed, 139 insertions(+), 48 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/73dd8893/data/mllib/sample_kmeans_data.txt -- diff --git a/data/mllib/sample_kmeans_data.txt b/data/mllib/sample_kmeans_data.txt new file mode 100644 index 000..5001377 --- /dev/null +++ b/data/mllib/sample_kmeans_data.txt @@ -0,0 +1,6 @@ +0 1:0.0 2:0.0 3:0.0 +1 1:0.1 2:0.1 3:0.1 +2 1:0.2 2:0.2 3:0.2 +3 1:9.0 2:9.0 3:9.0 +4 1:9.1 2:9.1 3:9.1 +5 1:9.2 2:9.2 3:9.2 http://git-wip-us.apache.org/repos/asf/spark/blob/73dd8893/docs/ml-clustering.md -- diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 440c455..1245b8b 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -104,4 +104,39 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/LDA.html) f {% include_example java/org/apache/spark/examples/ml/JavaLDAExample.java %} - \ No newline at end of file + + +## Bisecting k-means + + +Bisecting k-means is a kind of [hierarchical clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering) using a +divisive (or "top-down") approach: all observations start in one cluster, and splits are performed recursively as one +moves down the hierarchy. + +Bisecting K-means can often be much faster than regular K-means, but it will generally produce a different clustering. + +`BisectingKMeans` is implemented as an `Estimator` and generates a `BisectingKMeansModel` as the base model. + +### Example + + + + +Refer to the [Scala API docs](api/scala/index.html#org.apache.spark.ml.clustering.BisectingKMeans) for more details. + +{% include_example scala/org/apache/spark/examples/ml/BisectingKMeansExample.scala %} + + + +Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/BisectingKMeans.html) for more details. + +{% include_example java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java %} + + + +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering.BisectingKMeans) for more details. + +{% include_example python/ml/bisecting_k_means_example.py %} + + + http://git-wip-us.apache.org/repos/asf/spark/blob/73dd8893/examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java index 810ad90..6287144 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java @@ -17,27 +17,22 @@ package org.apache.spark.examples.ml; -import java.util.Arrays; -import java.util.List; - -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SparkSession; // $example on$ import org.apache.spark.ml.clustering.BisectingKMeans; import org.apache.spark.ml.clustering.BisectingKMeansModel; import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.VectorUDT; -import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.sql.Dataset; import
spark git commit: [SPARK-15141][EXAMPLE][DOC] Update OneVsRest Examples
Repository: spark Updated Branches: refs/heads/master 875ef7642 -> ad1a8466e [SPARK-15141][EXAMPLE][DOC] Update OneVsRest Examples ## What changes were proposed in this pull request? 1, Add python example for OneVsRest 2, remove args-parsing ## How was this patch tested? manual tests `./bin/spark-submit examples/src/main/python/ml/one_vs_rest_example.py` Author: Zheng RuiFengCloses #12920 from zhengruifeng/ovr_pe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad1a8466 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad1a8466 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad1a8466 Branch: refs/heads/master Commit: ad1a8466e9c10fbe8b455dba17b16973f92ebc15 Parents: 875ef76 Author: Zheng RuiFeng Authored: Wed May 11 09:53:36 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 09:53:36 2016 +0200 -- docs/ml-classification-regression.md| 7 + .../spark/examples/ml/JavaOneVsRestExample.java | 214 +++ .../src/main/python/ml/one_vs_rest_example.py | 68 ++ .../spark/examples/ml/OneVsRestExample.scala| 156 +++--- 4 files changed, 129 insertions(+), 316 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad1a8466/docs/ml-classification-regression.md -- diff --git a/docs/ml-classification-regression.md b/docs/ml-classification-regression.md index eaf4f6d..f6a6937 100644 --- a/docs/ml-classification-regression.md +++ b/docs/ml-classification-regression.md @@ -300,6 +300,13 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/classification/OneVsRe {% include_example java/org/apache/spark/examples/ml/JavaOneVsRestExample.java %} + + + +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.classification.OneVsRest) for more details. + +{% include_example python/ml/one_vs_rest_example.py %} + ## Naive Bayes http://git-wip-us.apache.org/repos/asf/spark/blob/ad1a8466/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java index e0cb752..5bf455e 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java @@ -17,222 +17,68 @@ package org.apache.spark.examples.ml; -import org.apache.commons.cli.*; - // $example on$ import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.ml.classification.OneVsRest; import org.apache.spark.ml.classification.OneVsRestModel; -import org.apache.spark.ml.util.MetadataUtils; -import org.apache.spark.mllib.evaluation.MulticlassMetrics; -import org.apache.spark.mllib.linalg.Matrix; -import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.StructField; // $example off$ +import org.apache.spark.sql.SparkSession; + /** - * An example runner for Multiclass to Binary Reduction with One Vs Rest. - * The example uses Logistic Regression as the base classifier. All parameters that - * can be specified on the base classifier can be passed in to the runner options. + * An example of Multiclass to Binary Reduction with One Vs Rest, + * using Logistic Regression as the base classifier. * Run with * - * bin/run-example ml.JavaOneVsRestExample [options] + * bin/run-example ml.JavaOneVsRestExample * */ public class JavaOneVsRestExample { - - private static class Params { -String input; -String testInput = null; -Integer maxIter = 100; -double tol = 1E-6; -boolean fitIntercept = true; -Double regParam = null; -Double elasticNetParam = null; -double fracTest = 0.2; - } - public static void main(String[] args) { -// parse the arguments -Params params = parse(args); SparkSession spark = SparkSession .builder() .appName("JavaOneVsRestExample") .getOrCreate(); // $example on$ -// configure the base classifier -LogisticRegression classifier = new LogisticRegression() - .setMaxIter(params.maxIter) - .setTol(params.tol) - .setFitIntercept(params.fitIntercept); +// load data file. +Dataset inputData = spark.read().format("libsvm") + .load("data/mllib/sample_multiclass_classification_data.txt"); -
spark git commit: [SPARK-15141][EXAMPLE][DOC] Update OneVsRest Examples
Repository: spark Updated Branches: refs/heads/branch-2.0 2d3c69a02 -> bee2ddb39 [SPARK-15141][EXAMPLE][DOC] Update OneVsRest Examples ## What changes were proposed in this pull request? 1, Add python example for OneVsRest 2, remove args-parsing ## How was this patch tested? manual tests `./bin/spark-submit examples/src/main/python/ml/one_vs_rest_example.py` Author: Zheng RuiFengCloses #12920 from zhengruifeng/ovr_pe. (cherry picked from commit ad1a8466e9c10fbe8b455dba17b16973f92ebc15) Signed-off-by: Nick Pentreath Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bee2ddb3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bee2ddb3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bee2ddb3 Branch: refs/heads/branch-2.0 Commit: bee2ddb3928863cd2e4817a6fc1d91a9da0e207d Parents: 2d3c69a Author: Zheng RuiFeng Authored: Wed May 11 09:53:36 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 09:54:17 2016 +0200 -- docs/ml-classification-regression.md| 7 + .../spark/examples/ml/JavaOneVsRestExample.java | 214 +++ .../src/main/python/ml/one_vs_rest_example.py | 68 ++ .../spark/examples/ml/OneVsRestExample.scala| 156 +++--- 4 files changed, 129 insertions(+), 316 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bee2ddb3/docs/ml-classification-regression.md -- diff --git a/docs/ml-classification-regression.md b/docs/ml-classification-regression.md index eaf4f6d..f6a6937 100644 --- a/docs/ml-classification-regression.md +++ b/docs/ml-classification-regression.md @@ -300,6 +300,13 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/classification/OneVsRe {% include_example java/org/apache/spark/examples/ml/JavaOneVsRestExample.java %} + + + +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.classification.OneVsRest) for more details. + +{% include_example python/ml/one_vs_rest_example.py %} + ## Naive Bayes http://git-wip-us.apache.org/repos/asf/spark/blob/bee2ddb3/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java index e0cb752..5bf455e 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java @@ -17,222 +17,68 @@ package org.apache.spark.examples.ml; -import org.apache.commons.cli.*; - // $example on$ import org.apache.spark.ml.classification.LogisticRegression; import org.apache.spark.ml.classification.OneVsRest; import org.apache.spark.ml.classification.OneVsRestModel; -import org.apache.spark.ml.util.MetadataUtils; -import org.apache.spark.mllib.evaluation.MulticlassMetrics; -import org.apache.spark.mllib.linalg.Matrix; -import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.StructField; // $example off$ +import org.apache.spark.sql.SparkSession; + /** - * An example runner for Multiclass to Binary Reduction with One Vs Rest. - * The example uses Logistic Regression as the base classifier. All parameters that - * can be specified on the base classifier can be passed in to the runner options. + * An example of Multiclass to Binary Reduction with One Vs Rest, + * using Logistic Regression as the base classifier. * Run with * - * bin/run-example ml.JavaOneVsRestExample [options] + * bin/run-example ml.JavaOneVsRestExample * */ public class JavaOneVsRestExample { - - private static class Params { -String input; -String testInput = null; -Integer maxIter = 100; -double tol = 1E-6; -boolean fitIntercept = true; -Double regParam = null; -Double elasticNetParam = null; -double fracTest = 0.2; - } - public static void main(String[] args) { -// parse the arguments -Params params = parse(args); SparkSession spark = SparkSession .builder() .appName("JavaOneVsRestExample") .getOrCreate(); // $example on$ -// configure the base classifier -LogisticRegression classifier = new LogisticRegression() - .setMaxIter(params.maxIter) - .setTol(params.tol) - .setFitIntercept(params.fitIntercept); +// load data file. +
spark git commit: [SPARK-15231][SQL] Document the semantic of saveAsTable and insertInto and don't drop columns silently
Repository: spark Updated Branches: refs/heads/branch-2.0 a8637f4ac -> 2d3c69a02 [SPARK-15231][SQL] Document the semantic of saveAsTable and insertInto and don't drop columns silently ## What changes were proposed in this pull request? This PR adds documents about the different behaviors between `insertInto` and `saveAsTable`, and throws an exception when the user try to add too man columns using `saveAsTable with append`. ## How was this patch tested? Unit tests added in this PR. Author: Shixiong ZhuCloses #13013 from zsxwing/SPARK-15231. (cherry picked from commit 875ef764280428acd095aec1834fee0ddad08611) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d3c69a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d3c69a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d3c69a0 Branch: refs/heads/branch-2.0 Commit: 2d3c69a0221cbf8a24c82b623b48edcf4e879730 Parents: a8637f4 Author: Shixiong Zhu Authored: Tue May 10 23:53:55 2016 -0700 Committer: Yin Huai Committed: Tue May 10 23:54:53 2016 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 36 +++- .../command/createDataSourceTables.scala| 5 +++ .../sql/hive/MetastoreDataSourcesSuite.scala| 43 3 files changed, 82 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2d3c69a0/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index da9d254..a9e8329 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -361,6 +361,23 @@ final class DataFrameWriter private[sql](df: DataFrame) { * Inserts the content of the [[DataFrame]] to the specified table. It requires that * the schema of the [[DataFrame]] is the same as the schema of the table. * + * Note: Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based + * resolution. For example: + * + * {{{ + *scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") + *scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1") + *scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1") + *scala> sql("select * from t1").show + *+---+---+ + *| i| j| + *+---+---+ + *| 5| 6| + *| 3| 4| + *| 1| 2| + *+---+---+ + * }}} + * * Because it inserts data to an existing table, format or options will be ignored. * * @since 1.4.0 @@ -454,8 +471,23 @@ final class DataFrameWriter private[sql](df: DataFrame) { * save mode, specified by the `mode` function (default to throwing an exception). * When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not need to be * the same as that of the existing table. - * When `mode` is `Append`, the schema of the [[DataFrame]] need to be - * the same as that of the existing table, and format or options will be ignored. + * + * When `mode` is `Append`, if there is an existing table, we will use the format and options of + * the existing table. The column order in the schema of the [[DataFrame]] doesn't need to be same + * as that of the existing table. Unlike `insertInto`, `saveAsTable` will use the column names to + * find the correct column positions. For example: + * + * {{{ + *scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") + *scala> Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("t1") + *scala> sql("select * from t1").show + *+---+---+ + *| i| j| + *+---+---+ + *| 1| 2| + *| 4| 3| + *+---+---+ + * }}} * * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC http://git-wip-us.apache.org/repos/asf/spark/blob/2d3c69a0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 1494341..3525111 100644 ---
spark git commit: [SPARK-15231][SQL] Document the semantic of saveAsTable and insertInto and don't drop columns silently
Repository: spark Updated Branches: refs/heads/master 007882c7e -> 875ef7642 [SPARK-15231][SQL] Document the semantic of saveAsTable and insertInto and don't drop columns silently ## What changes were proposed in this pull request? This PR adds documents about the different behaviors between `insertInto` and `saveAsTable`, and throws an exception when the user try to add too man columns using `saveAsTable with append`. ## How was this patch tested? Unit tests added in this PR. Author: Shixiong ZhuCloses #13013 from zsxwing/SPARK-15231. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/875ef764 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/875ef764 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/875ef764 Branch: refs/heads/master Commit: 875ef764280428acd095aec1834fee0ddad08611 Parents: 007882c Author: Shixiong Zhu Authored: Tue May 10 23:53:55 2016 -0700 Committer: Yin Huai Committed: Tue May 10 23:53:55 2016 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 36 +++- .../command/createDataSourceTables.scala| 5 +++ .../sql/hive/MetastoreDataSourcesSuite.scala| 43 3 files changed, 82 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/875ef764/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index da9d254..a9e8329 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -361,6 +361,23 @@ final class DataFrameWriter private[sql](df: DataFrame) { * Inserts the content of the [[DataFrame]] to the specified table. It requires that * the schema of the [[DataFrame]] is the same as the schema of the table. * + * Note: Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based + * resolution. For example: + * + * {{{ + *scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") + *scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1") + *scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1") + *scala> sql("select * from t1").show + *+---+---+ + *| i| j| + *+---+---+ + *| 5| 6| + *| 3| 4| + *| 1| 2| + *+---+---+ + * }}} + * * Because it inserts data to an existing table, format or options will be ignored. * * @since 1.4.0 @@ -454,8 +471,23 @@ final class DataFrameWriter private[sql](df: DataFrame) { * save mode, specified by the `mode` function (default to throwing an exception). * When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not need to be * the same as that of the existing table. - * When `mode` is `Append`, the schema of the [[DataFrame]] need to be - * the same as that of the existing table, and format or options will be ignored. + * + * When `mode` is `Append`, if there is an existing table, we will use the format and options of + * the existing table. The column order in the schema of the [[DataFrame]] doesn't need to be same + * as that of the existing table. Unlike `insertInto`, `saveAsTable` will use the column names to + * find the correct column positions. For example: + * + * {{{ + *scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1") + *scala> Seq((3, 4)).toDF("j", "i").write.mode("append").saveAsTable("t1") + *scala> sql("select * from t1").show + *+---+---+ + *| i| j| + *+---+---+ + *| 1| 2| + *| 4| 3| + *+---+---+ + * }}} * * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC http://git-wip-us.apache.org/repos/asf/spark/blob/875ef764/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 1494341..3525111 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@
spark git commit: [SPARK-15189][PYSPARK][DOCS] Update ml.evaluation PyDoc
Repository: spark Updated Branches: refs/heads/branch-2.0 ca5ce5365 -> a8637f4ac [SPARK-15189][PYSPARK][DOCS] Update ml.evaluation PyDoc ## What changes were proposed in this pull request? Fix doctest issue, short param description, and tag items as Experimental ## How was this patch tested? build docs locally & doctests Author: Holden KarauCloses #12964 from holdenk/SPARK-15189-ml.Evaluation-PyDoc-issues. (cherry picked from commit 007882c7ee06de37ba309424fced1e4c6b408572) Signed-off-by: Nick Pentreath Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8637f4a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8637f4a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8637f4a Branch: refs/heads/branch-2.0 Commit: a8637f4ac793a306eb4258682a4a2afae4254e83 Parents: ca5ce53 Author: Holden Karau Authored: Wed May 11 08:33:29 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 08:33:50 2016 +0200 -- python/pyspark/ml/evaluation.py | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a8637f4a/python/pyspark/ml/evaluation.py -- diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 2a41678..719c0c7 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -105,6 +105,8 @@ class JavaEvaluator(JavaParams, Evaluator): @inherit_doc class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol): """ +.. note:: Experimental + Evaluator for binary classification, which expects two input columns: rawPrediction and label. The rawPrediction column can be of type double (binary 0/1 prediction, or probability of label 1) or of type vector (length-2 vector of raw predictions, scores, or label probabilities). @@ -172,6 +174,8 @@ class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPrediction @inherit_doc class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol): """ +.. note:: Experimental + Evaluator for Regression, which expects two input columns: prediction and label. @@ -193,7 +197,11 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol): # when we evaluate a metric that is needed to minimize (e.g., `"rmse"`, `"mse"`, `"mae"`), # we take and output the negative of this metric. metricName = Param(Params._dummy(), "metricName", - "metric name in evaluation (mse|rmse|r2|mae)", + """metric name in evaluation - one of: + rmse - root mean squared error (default) + mse - mean squared error + r2 - r^2 metric + mae - mean absolute error.""", typeConverter=TypeConverters.toString) @keyword_only @@ -241,8 +249,11 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol): @inherit_doc class MulticlassClassificationEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol): """ +.. note:: Experimental + Evaluator for Multiclass Classification, which expects two input columns: prediction and label. + >>> scoreAndLabels = [(0.0, 0.0), (0.0, 1.0), (0.0, 0.0), ... (1.0, 0.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (2.0, 2.0), (2.0, 0.0)] >>> dataset = sqlContext.createDataFrame(scoreAndLabels, ["prediction", "label"]) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15189][PYSPARK][DOCS] Update ml.evaluation PyDoc
Repository: spark Updated Branches: refs/heads/master ba181c0c7 -> 007882c7e [SPARK-15189][PYSPARK][DOCS] Update ml.evaluation PyDoc ## What changes were proposed in this pull request? Fix doctest issue, short param description, and tag items as Experimental ## How was this patch tested? build docs locally & doctests Author: Holden KarauCloses #12964 from holdenk/SPARK-15189-ml.Evaluation-PyDoc-issues. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/007882c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/007882c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/007882c7 Branch: refs/heads/master Commit: 007882c7ee06de37ba309424fced1e4c6b408572 Parents: ba181c0 Author: Holden Karau Authored: Wed May 11 08:33:29 2016 +0200 Committer: Nick Pentreath Committed: Wed May 11 08:33:29 2016 +0200 -- python/pyspark/ml/evaluation.py | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/007882c7/python/pyspark/ml/evaluation.py -- diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 2a41678..719c0c7 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -105,6 +105,8 @@ class JavaEvaluator(JavaParams, Evaluator): @inherit_doc class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol): """ +.. note:: Experimental + Evaluator for binary classification, which expects two input columns: rawPrediction and label. The rawPrediction column can be of type double (binary 0/1 prediction, or probability of label 1) or of type vector (length-2 vector of raw predictions, scores, or label probabilities). @@ -172,6 +174,8 @@ class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPrediction @inherit_doc class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol): """ +.. note:: Experimental + Evaluator for Regression, which expects two input columns: prediction and label. @@ -193,7 +197,11 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol): # when we evaluate a metric that is needed to minimize (e.g., `"rmse"`, `"mse"`, `"mae"`), # we take and output the negative of this metric. metricName = Param(Params._dummy(), "metricName", - "metric name in evaluation (mse|rmse|r2|mae)", + """metric name in evaluation - one of: + rmse - root mean squared error (default) + mse - mean squared error + r2 - r^2 metric + mae - mean absolute error.""", typeConverter=TypeConverters.toString) @keyword_only @@ -241,8 +249,11 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol): @inherit_doc class MulticlassClassificationEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol): """ +.. note:: Experimental + Evaluator for Multiclass Classification, which expects two input columns: prediction and label. + >>> scoreAndLabels = [(0.0, 0.0), (0.0, 1.0), (0.0, 0.0), ... (1.0, 0.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (2.0, 2.0), (2.0, 0.0)] >>> dataset = sqlContext.createDataFrame(scoreAndLabels, ["prediction", "label"]) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org