spark git commit: [SPARK-21912][SQL] ORC/Parquet table should not create invalid column names
Repository: spark Updated Branches: refs/heads/master ce7293c15 -> eea2b877c [SPARK-21912][SQL] ORC/Parquet table should not create invalid column names ## What changes were proposed in this pull request? Currently, users meet job abortions while creating or altering ORC/Parquet tables with invalid column names. We had better prevent this by raising **AnalysisException** with a guide to use aliases instead like Paquet data source tables. **BEFORE** ```scala scala> sql("CREATE TABLE orc1 USING ORC AS SELECT 1 `a b`") 17/09/04 13:28:21 ERROR Utils: Aborting task java.lang.IllegalArgumentException: Error: : expected at the position 8 of 'struct' but ' ' is found. 17/09/04 13:28:21 ERROR FileFormatWriter: Job job_20170904132821_0001 aborted. 17/09/04 13:28:21 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) org.apache.spark.SparkException: Task failed while writing rows. ``` **AFTER** ```scala scala> sql("CREATE TABLE orc1 USING ORC AS SELECT 1 `a b`") 17/09/04 13:27:40 ERROR CreateDataSourceTableAsSelectCommand: Failed to write to table orc1 org.apache.spark.sql.AnalysisException: Attribute name "a b" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.; ``` ## How was this patch tested? Pass the Jenkins with a new test case. Author: Dongjoon HyunCloses #19124 from dongjoon-hyun/SPARK-21912. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eea2b877 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eea2b877 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eea2b877 Branch: refs/heads/master Commit: eea2b877cf4e6ba4ea524bf8d782516add1b093e Parents: ce7293c Author: Dongjoon Hyun Authored: Wed Sep 6 22:20:48 2017 -0700 Committer: gatorsmile Committed: Wed Sep 6 22:20:48 2017 -0700 -- .../spark/sql/execution/command/ddl.scala | 21 ++ .../spark/sql/execution/command/tables.scala| 5 ++- .../datasources/DataSourceStrategy.scala| 2 + .../datasources/orc/OrcFileFormat.scala | 42 .../parquet/ParquetSchemaConverter.scala| 2 +- .../resources/sql-tests/inputs/show_columns.sql | 4 +- .../sql-tests/results/show_columns.sql.out | 4 +- .../apache/spark/sql/hive/HiveStrategies.scala | 2 + .../sql/hive/execution/SQLQuerySuite.scala | 34 9 files changed, 109 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index dae160f..7611e1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -34,6 +34,9 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -848,4 +851,22 @@ object DDLUtils { } } } + + private[sql] def checkDataSchemaFieldNames(table: CatalogTable): Unit = { +table.provider.foreach { + _.toLowerCase(Locale.ROOT) match { +case HIVE_PROVIDER => + val serde = table.storage.serde + if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { +OrcFileFormat.checkFieldNames(table.dataSchema) + } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde || + serde == Some("parquet.hive.serde.ParquetHiveSerDe")) { +ParquetSchemaConverter.checkFieldNames(table.dataSchema) + } +case "parquet" => ParquetSchemaConverter.checkFieldNames(table.dataSchema) +case "orc" => OrcFileFormat.checkFieldNames(table.dataSchema) +case _ => + } +} + } } http://git-wip-us.apache.org/repos/asf/spark/blob/eea2b877/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
spark git commit: [SPARK-21835][SQL][FOLLOW-UP] RewritePredicateSubquery should not produce unresolved query plans
Repository: spark Updated Branches: refs/heads/master aad212547 -> ce7293c15 [SPARK-21835][SQL][FOLLOW-UP] RewritePredicateSubquery should not produce unresolved query plans ## What changes were proposed in this pull request? This is a follow-up of #19050 to deal with `ExistenceJoin` case. ## How was this patch tested? Added test. Author: Liang-Chi HsiehCloses #19151 from viirya/SPARK-21835-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce7293c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce7293c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce7293c1 Branch: refs/heads/master Commit: ce7293c150c71a872d20beda44b12dec9deca18d Parents: aad2125 Author: Liang-Chi Hsieh Authored: Wed Sep 6 22:15:25 2017 -0700 Committer: gatorsmile Committed: Wed Sep 6 22:15:25 2017 -0700 -- .../apache/spark/sql/catalyst/optimizer/subquery.scala | 11 +++ .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 12 2 files changed, 19 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ce7293c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 7ff8915..64b2856 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -49,12 +49,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } } - private def dedupJoin(joinPlan: Join): Join = joinPlan match { + private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match { // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes, // the produced join then becomes unresolved and break structural integrity. We should // de-duplicate conflicting attributes. We don't use transformation here because we only // care about the most top join converted from correlated predicate subquery. -case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) => +case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) => val duplicates = right.outputSet.intersect(left.outputSet) if (duplicates.nonEmpty) { val aliasMap = AttributeMap(duplicates.map { dup => @@ -145,13 +145,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { e transformUp { case Exists(sub, conditions, _) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + // Deduplicate conflicting attributes if any. + newPlan = dedupJoin( +Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))) exists case In(value, Seq(ListQuery(sub, conditions, _, _))) => val exists = AttributeReference("exists", BooleanType, nullable = false)() val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled) val newConditions = (inConditions ++ conditions).reduceLeftOption(And) - newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions) + // Deduplicate conflicting attributes if any. + newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions)) exists } } http://git-wip-us.apache.org/repos/asf/spark/blob/ce7293c1/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index ee6905e..8673dc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -938,4 +938,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 3") { +val sqlText = + """ +|SELECT * FROM l, r WHERE l.a = r.c + 1 AND +|(EXISTS (SELECT * FROM r) OR l.a = r.c) + """.stripMargin +val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan +val join = optimizedPlan.collectFirst
spark git commit: Fixed pandoc dependency issue in python/setup.py
Repository: spark Updated Branches: refs/heads/branch-2.2 342cc2a4c -> 49968de52 Fixed pandoc dependency issue in python/setup.py ## Problem Description When pyspark is listed as a dependency of another package, installing the other package will cause an install failure in pyspark. When the other package is being installed, pyspark's setup_requires requirements are installed including pypandoc. Thus, the exception handling on setup.py:152 does not work because the pypandoc module is indeed available. However, the pypandoc.convert() function fails if pandoc itself is not installed (in our use cases it is not). This raises an OSError that is not handled, and setup fails. The following is a sample failure: ``` $ which pandoc $ pip freeze | grep pypandoc pypandoc==1.4 $ pip install pyspark Collecting pyspark Downloading pyspark-2.2.0.post0.tar.gz (188.3MB) 100% |ââââââââââââââââââââââââââââââââ| 188.3MB 16.8MB/s Complete output from command python setup.py egg_info: Maybe try: sudo apt-get install pandoc See http://johnmacfarlane.net/pandoc/installing.html for installation options --- Traceback (most recent call last): File "", line 1, in File "/tmp/pip-build-mfnizcwa/pyspark/setup.py", line 151, in long_description = pypandoc.convert('README.md', 'rst') File "/home/tbeck/.virtualenvs/cem/lib/python3.5/site-packages/pypandoc/__init__.py", line 69, in convert outputfile=outputfile, filters=filters) File "/home/tbeck/.virtualenvs/cem/lib/python3.5/site-packages/pypandoc/__init__.py", line 260, in _convert_input _ensure_pandoc_path() File "/home/tbeck/.virtualenvs/cem/lib/python3.5/site-packages/pypandoc/__init__.py", line 544, in _ensure_pandoc_path raise OSError("No pandoc was found: either install pandoc and add it\n" OSError: No pandoc was found: either install pandoc and add it to your PATH or or call pypandoc.download_pandoc(...) or install pypandoc wheels with included pandoc. Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-mfnizcwa/pyspark/ ``` ## What changes were proposed in this pull request? This change simply adds an additional exception handler for the OSError that is raised. This allows pyspark to be installed client-side without requiring pandoc to be installed. ## How was this patch tested? I tested this by building a wheel package of pyspark with the change applied. Then, in a clean virtual environment with pypandoc installed but pandoc not available on the system, I installed pyspark from the wheel. Here is the output ``` $ pip freeze | grep pypandoc pypandoc==1.4 $ which pandoc $ pip install --no-cache-dir ../spark/python/dist/pyspark-2.3.0.dev0-py2.py3-none-any.whl Processing /home/tbeck/work/spark/python/dist/pyspark-2.3.0.dev0-py2.py3-none-any.whl Requirement already satisfied: py4j==0.10.6 in /home/tbeck/.virtualenvs/cem/lib/python3.5/site-packages (from pyspark==2.3.0.dev0) Installing collected packages: pyspark Successfully installed pyspark-2.3.0.dev0 ``` Author: Tucker BeckCloses #18981 from dusktreader/dusktreader/fix-pandoc-dependency-issue-in-setup_py. (cherry picked from commit aad2125475dcdeb4a0410392b6706511db17bac4) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49968de5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49968de5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49968de5 Branch: refs/heads/branch-2.2 Commit: 49968de526e76a75abafb636cbd5ed84f9a496e9 Parents: 342cc2a Author: Tucker Beck Authored: Thu Sep 7 09:38:00 2017 +0900 Committer: hyukjinkwon Committed: Thu Sep 7 09:38:21 2017 +0900 -- python/setup.py | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49968de5/python/setup.py -- diff --git a/python/setup.py b/python/setup.py index f500354..7e63461 100644 --- a/python/setup.py +++ b/python/setup.py @@ -151,6 +151,8 @@ try: long_description = pypandoc.convert('README.md', 'rst') except ImportError: print("Could not import pypandoc - required to package PySpark", file=sys.stderr) +except OSError: +print("Could not convert - pandoc is not installed", file=sys.stderr) setup( name='pyspark', - To unsubscribe, e-mail:
spark git commit: Fixed pandoc dependency issue in python/setup.py
Repository: spark Updated Branches: refs/heads/master fa0092bdd -> aad212547 Fixed pandoc dependency issue in python/setup.py ## Problem Description When pyspark is listed as a dependency of another package, installing the other package will cause an install failure in pyspark. When the other package is being installed, pyspark's setup_requires requirements are installed including pypandoc. Thus, the exception handling on setup.py:152 does not work because the pypandoc module is indeed available. However, the pypandoc.convert() function fails if pandoc itself is not installed (in our use cases it is not). This raises an OSError that is not handled, and setup fails. The following is a sample failure: ``` $ which pandoc $ pip freeze | grep pypandoc pypandoc==1.4 $ pip install pyspark Collecting pyspark Downloading pyspark-2.2.0.post0.tar.gz (188.3MB) 100% |ââââââââââââââââââââââââââââââââ| 188.3MB 16.8MB/s Complete output from command python setup.py egg_info: Maybe try: sudo apt-get install pandoc See http://johnmacfarlane.net/pandoc/installing.html for installation options --- Traceback (most recent call last): File "", line 1, in File "/tmp/pip-build-mfnizcwa/pyspark/setup.py", line 151, in long_description = pypandoc.convert('README.md', 'rst') File "/home/tbeck/.virtualenvs/cem/lib/python3.5/site-packages/pypandoc/__init__.py", line 69, in convert outputfile=outputfile, filters=filters) File "/home/tbeck/.virtualenvs/cem/lib/python3.5/site-packages/pypandoc/__init__.py", line 260, in _convert_input _ensure_pandoc_path() File "/home/tbeck/.virtualenvs/cem/lib/python3.5/site-packages/pypandoc/__init__.py", line 544, in _ensure_pandoc_path raise OSError("No pandoc was found: either install pandoc and add it\n" OSError: No pandoc was found: either install pandoc and add it to your PATH or or call pypandoc.download_pandoc(...) or install pypandoc wheels with included pandoc. Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-mfnizcwa/pyspark/ ``` ## What changes were proposed in this pull request? This change simply adds an additional exception handler for the OSError that is raised. This allows pyspark to be installed client-side without requiring pandoc to be installed. ## How was this patch tested? I tested this by building a wheel package of pyspark with the change applied. Then, in a clean virtual environment with pypandoc installed but pandoc not available on the system, I installed pyspark from the wheel. Here is the output ``` $ pip freeze | grep pypandoc pypandoc==1.4 $ which pandoc $ pip install --no-cache-dir ../spark/python/dist/pyspark-2.3.0.dev0-py2.py3-none-any.whl Processing /home/tbeck/work/spark/python/dist/pyspark-2.3.0.dev0-py2.py3-none-any.whl Requirement already satisfied: py4j==0.10.6 in /home/tbeck/.virtualenvs/cem/lib/python3.5/site-packages (from pyspark==2.3.0.dev0) Installing collected packages: pyspark Successfully installed pyspark-2.3.0.dev0 ``` Author: Tucker BeckCloses #18981 from dusktreader/dusktreader/fix-pandoc-dependency-issue-in-setup_py. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aad21254 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aad21254 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aad21254 Branch: refs/heads/master Commit: aad2125475dcdeb4a0410392b6706511db17bac4 Parents: fa0092b Author: Tucker Beck Authored: Thu Sep 7 09:38:00 2017 +0900 Committer: hyukjinkwon Committed: Thu Sep 7 09:38:00 2017 +0900 -- python/setup.py | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aad21254/python/setup.py -- diff --git a/python/setup.py b/python/setup.py index cfc83c6..02612ff 100644 --- a/python/setup.py +++ b/python/setup.py @@ -151,6 +151,8 @@ try: long_description = pypandoc.convert('README.md', 'rst') except ImportError: print("Could not import pypandoc - required to package PySpark", file=sys.stderr) +except OSError: +print("Could not convert - pandoc is not installed", file=sys.stderr) setup( name='pyspark', - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21901][SS] Define toString for StateOperatorProgress
Repository: spark Updated Branches: refs/heads/branch-2.2 9afab9a52 -> 342cc2a4c [SPARK-21901][SS] Define toString for StateOperatorProgress ## What changes were proposed in this pull request? Just `StateOperatorProgress.toString` + few formatting fixes ## How was this patch tested? Local build. Waiting for OK from Jenkins. Author: Jacek LaskowskiCloses #19112 from jaceklaskowski/SPARK-21901-StateOperatorProgress-toString. (cherry picked from commit fa0092bddf695a757f5ddaed539e55e2dc9fccb7) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/342cc2a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/342cc2a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/342cc2a4 Branch: refs/heads/branch-2.2 Commit: 342cc2a4cad4b8491f4689b66570d14e5fcba33b Parents: 9afab9a Author: Jacek Laskowski Authored: Wed Sep 6 15:48:48 2017 -0700 Committer: Shixiong Zhu Committed: Wed Sep 6 15:49:03 2017 -0700 -- .../src/main/scala/org/apache/spark/sql/streaming/progress.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/342cc2a4/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index fb590e7..5171852 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -49,6 +49,8 @@ class StateOperatorProgress private[sql]( ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) } + + override def toString: String = prettyJson } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21901][SS] Define toString for StateOperatorProgress
Repository: spark Updated Branches: refs/heads/master acdf45fb5 -> fa0092bdd [SPARK-21901][SS] Define toString for StateOperatorProgress ## What changes were proposed in this pull request? Just `StateOperatorProgress.toString` + few formatting fixes ## How was this patch tested? Local build. Waiting for OK from Jenkins. Author: Jacek LaskowskiCloses #19112 from jaceklaskowski/SPARK-21901-StateOperatorProgress-toString. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa0092bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa0092bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa0092bd Branch: refs/heads/master Commit: fa0092bddf695a757f5ddaed539e55e2dc9fccb7 Parents: acdf45f Author: Jacek Laskowski Authored: Wed Sep 6 15:48:48 2017 -0700 Committer: Shixiong Zhu Committed: Wed Sep 6 15:48:48 2017 -0700 -- .../src/main/scala/org/apache/spark/sql/streaming/progress.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa0092bd/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 3000c42..cedc1dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -55,6 +55,8 @@ class StateOperatorProgress private[sql]( ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) } + + override def toString: String = prettyJson } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21765] Check that optimization doesn't affect isStreaming bit.
Repository: spark Updated Branches: refs/heads/master 36b48ee6e -> acdf45fb5 [SPARK-21765] Check that optimization doesn't affect isStreaming bit. ## What changes were proposed in this pull request? Add an assert in logical plan optimization that the isStreaming bit stays the same, and fix empty relation rules where that wasn't happening. ## How was this patch tested? new and existing unit tests Author: Jose TorresAuthor: Jose Torres Closes #19056 from joseph-torres/SPARK-21765-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acdf45fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acdf45fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acdf45fb Branch: refs/heads/master Commit: acdf45fb52e29a0308cccdbef0ec0dca0815d300 Parents: 36b48ee Author: Jose Torres Authored: Wed Sep 6 11:19:46 2017 -0700 Committer: Tathagata Das Committed: Wed Sep 6 11:19:46 2017 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 6 +- .../optimizer/PropagateEmptyRelation.scala | 11 ++- .../catalyst/plans/logical/LocalRelation.scala | 2 +- .../spark/sql/catalyst/rules/RuleExecutor.scala | 1 - .../optimizer/PropagateEmptyRelationSuite.scala | 44 .../execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/execution/streaming/socket.scala | 17 ++--- .../streaming/TextSocketStreamSuite.scala | 72 +++- 8 files changed, 103 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/acdf45fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d7e5906..02d6778 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -724,8 +724,10 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { case Filter(Literal(true, BooleanType), child) => child // If the filter condition always evaluate to null or false, // replace the input with an empty relation. -case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty) -case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) +case Filter(Literal(null, _), child) => + LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) +case Filter(Literal(false, BooleanType), child) => + LocalRelation(child.output, data = Seq.empty, isStreaming = plan.isStreaming) // If any deterministic condition is guaranteed to be true given the constraints on the child's // output, remove the condition case f @ Filter(fc, p: LogicalPlan) => http://git-wip-us.apache.org/repos/asf/spark/blob/acdf45fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 987cd74..cfffa6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -38,7 +38,8 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { case _ => false } - private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty) + private def empty(plan: LogicalPlan) = +LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case p: Union if p.children.forall(isEmptyLocalRelation) => @@ -65,11 +66,15 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper { case _: RepartitionByExpression => empty(p) // An aggregate with non-empty group expression will return one output row per group when the // input to the aggregate is not empty. If the input to the aggregate is empty then all groups - // will be empty and thus the output will be empty. + // will be empty and thus the output will be empty. If we're working on batch data, we can + //
spark git commit: [SPARK-21801][SPARKR][TEST] set random seed for predictable test
Repository: spark Updated Branches: refs/heads/master f2e22aebf -> 36b48ee6e [SPARK-21801][SPARKR][TEST] set random seed for predictable test ## What changes were proposed in this pull request? set.seed() before running tests ## How was this patch tested? jenkins, appveyor Author: Felix CheungCloses #19111 from felixcheung/rranseed. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36b48ee6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36b48ee6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36b48ee6 Branch: refs/heads/master Commit: 36b48ee6e92661645648a001d0d83623a8e5d601 Parents: f2e22ae Author: Felix Cheung Authored: Wed Sep 6 09:53:55 2017 -0700 Committer: Felix Cheung Committed: Wed Sep 6 09:53:55 2017 -0700 -- R/pkg/tests/run-all.R | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36b48ee6/R/pkg/tests/run-all.R -- diff --git a/R/pkg/tests/run-all.R b/R/pkg/tests/run-all.R index 0aefd80..a1834a2 100644 --- a/R/pkg/tests/run-all.R +++ b/R/pkg/tests/run-all.R @@ -43,6 +43,8 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) { test_package("SparkR") if (identical(Sys.getenv("NOT_CRAN"), "true")) { + # set random seed for predictable results. mostly for base's sample() in tree and classification + set.seed(42) # for testthat 1.0.2 later, change reporter from "summary" to default_reporter() testthat:::run_tests("SparkR", file.path(sparkRDir, "pkg", "tests", "fulltests"), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21835][SQL] RewritePredicateSubquery should not produce unresolved query plans
Repository: spark Updated Branches: refs/heads/master 64936c14a -> f2e22aebf [SPARK-21835][SQL] RewritePredicateSubquery should not produce unresolved query plans ## What changes were proposed in this pull request? Correlated predicate subqueries are rewritten into `Join` by the rule `RewritePredicateSubquery` during optimization. It is possibly that the two sides of the `Join` have conflicting attributes. The query plans produced by `RewritePredicateSubquery` become unresolved and break structural integrity. We should check if there are conflicting attributes in the `Join` and de-duplicate them by adding a `Project`. ## How was this patch tested? Added tests. Author: Liang-Chi HsiehCloses #19050 from viirya/SPARK-21835. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2e22aeb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2e22aeb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2e22aeb Branch: refs/heads/master Commit: f2e22aebfe49cdfdf20f060305772971bcea9266 Parents: 64936c1 Author: Liang-Chi Hsieh Authored: Wed Sep 6 07:42:19 2017 -0700 Committer: gatorsmile Committed: Wed Sep 6 07:42:19 2017 -0700 -- .../spark/sql/catalyst/optimizer/subquery.scala | 39 ++-- .../org/apache/spark/sql/SubquerySuite.scala| 63 2 files changed, 98 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f2e22aeb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 4386a10..7ff8915 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -49,6 +49,33 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } } + private def dedupJoin(joinPlan: Join): Join = joinPlan match { +// SPARK-21835: It is possibly that the two sides of the join have conflicting attributes, +// the produced join then becomes unresolved and break structural integrity. We should +// de-duplicate conflicting attributes. We don't use transformation here because we only +// care about the most top join converted from correlated predicate subquery. +case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) => + val duplicates = right.outputSet.intersect(left.outputSet) + if (duplicates.nonEmpty) { +val aliasMap = AttributeMap(duplicates.map { dup => + dup -> Alias(dup, dup.toString)() +}.toSeq) +val aliasedExpressions = right.output.map { ref => + aliasMap.getOrElse(ref, ref) +} +val newRight = Project(aliasedExpressions, right) +val newJoinCond = joinCond.map { condExpr => + condExpr transform { +case a: Attribute => aliasMap.getOrElse(a, a).toAttribute + } +} +Join(left, newRight, joinType, newJoinCond) + } else { +j + } +case _ => joinPlan + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Filter(condition, child) => val (withSubquery, withoutSubquery) = @@ -64,14 +91,17 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { withSubquery.foldLeft(newFilter) { case (p, Exists(sub, conditions, _)) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - Join(outerPlan, sub, LeftSemi, joinCond) + // Deduplicate conflicting attributes if any. + dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond)) case (p, Not(Exists(sub, conditions, _))) => val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - Join(outerPlan, sub, LeftAnti, joinCond) + // Deduplicate conflicting attributes if any. + dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond)) case (p, In(value, Seq(ListQuery(sub, conditions, _, _ => val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p) - Join(outerPlan, sub, LeftSemi, joinCond) + // Deduplicate conflicting attributes if any. + dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond)) case (p, Not(In(value, Seq(ListQuery(sub, conditions, _, _) => // This is a NULL-aware
spark git commit: [SPARK-21903][BUILD][FOLLOWUP] Upgrade scalastyle-maven-plugin and scalastyle as well in POM and SparkBuild.scala
Repository: spark Updated Branches: refs/heads/master 16c4c03c7 -> 64936c14a [SPARK-21903][BUILD][FOLLOWUP] Upgrade scalastyle-maven-plugin and scalastyle as well in POM and SparkBuild.scala ## What changes were proposed in this pull request? This PR proposes to match scalastyle version in POM and SparkBuild.scala ## How was this patch tested? Manual builds. Author: hyukjinkwonCloses #19146 from HyukjinKwon/SPARK-21903-follow-up. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64936c14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64936c14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64936c14 Branch: refs/heads/master Commit: 64936c14a7ef30b9eacb129bafe6a1665887bf21 Parents: 16c4c03 Author: hyukjinkwon Authored: Wed Sep 6 23:28:12 2017 +0900 Committer: hyukjinkwon Committed: Wed Sep 6 23:28:12 2017 +0900 -- pom.xml | 2 +- project/SparkBuild.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/64936c14/pom.xml -- diff --git a/pom.xml b/pom.xml index 09794c1..a051fea 100644 --- a/pom.xml +++ b/pom.xml @@ -2463,7 +2463,7 @@ org.scalastyle scalastyle-maven-plugin -0.9.0 +1.0.0 false true http://git-wip-us.apache.org/repos/asf/spark/blob/64936c14/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 20848f0..748b1c4 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -123,7 +123,7 @@ object SparkBuild extends PomBuild { lazy val scalaStyleRules = Project("scalaStyleRules", file("scalastyle")) .settings( - libraryDependencies += "org.scalastyle" %% "scalastyle" % "0.9.0" + libraryDependencies += "org.scalastyle" %% "scalastyle" % "1.0.0" ) lazy val scalaStyleOnCompile = taskKey[Unit]("scalaStyleOnCompile") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19357][ML] Adding parallel model evaluation in ML tuning
Repository: spark Updated Branches: refs/heads/master 4ee7dfe41 -> 16c4c03c7 [SPARK-19357][ML] Adding parallel model evaluation in ML tuning ## What changes were proposed in this pull request? Modified `CrossValidator` and `TrainValidationSplit` to be able to evaluate models in parallel for a given parameter grid. The level of parallelism is controlled by a parameter `numParallelEval` used to schedule a number of models to be trained/evaluated so that the jobs can be run concurrently. This is a naive approach that does not check the cluster for needed resources, so care must be taken by the user to tune the parameter appropriately. The default value is `1` which will train/evaluate in serial. ## How was this patch tested? Added unit tests for CrossValidator and TrainValidationSplit to verify that model selection is the same when run in serial vs parallel. Manual testing to verify tasks run in parallel when param is > 1. Added parameter usage to relevant examples. Author: Bryan CutlerCloses #16774 from BryanCutler/parallel-model-eval-SPARK-19357. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16c4c03c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16c4c03c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16c4c03c Branch: refs/heads/master Commit: 16c4c03c71394ab30c8edaf4418973e1a2c5ebfe Parents: 4ee7dfe Author: Bryan Cutler Authored: Wed Sep 6 14:12:27 2017 +0200 Committer: Nick Pentreath Committed: Wed Sep 6 14:12:27 2017 +0200 -- docs/ml-tuning.md | 2 + ...ModelSelectionViaCrossValidationExample.java | 4 +- ...SelectionViaTrainValidationSplitExample.java | 3 +- ...odelSelectionViaCrossValidationExample.scala | 1 + ...electionViaTrainValidationSplitExample.scala | 2 + .../spark/ml/param/shared/HasParallelism.scala | 59 .../apache/spark/ml/tuning/CrossValidator.scala | 71 ++-- .../spark/ml/tuning/TrainValidationSplit.scala | 57 .../spark/ml/tuning/CrossValidatorSuite.scala | 27 .../ml/tuning/TrainValidationSplitSuite.scala | 35 +- 10 files changed, 221 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16c4c03c/docs/ml-tuning.md -- diff --git a/docs/ml-tuning.md b/docs/ml-tuning.md index e9123db..64dc46c 100644 --- a/docs/ml-tuning.md +++ b/docs/ml-tuning.md @@ -55,6 +55,8 @@ for multiclass problems. The default metric used to choose the best `ParamMap` c method in each of these evaluators. To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility. +By default, sets of parameters from the parameter grid are evaluated in serial. Parameter evaluation can be done in parallel by setting `parallelism` with a value of 2 or more (a value of 1 will be serial) before running model selection with `CrossValidator` or `TrainValidationSplit` (NOTE: this is not yet supported in Python). +The value of `parallelism` should be chosen carefully to maximize parallelism without exceeding cluster resources, and larger values may not always lead to improved performance. Generally speaking, a value up to 10 should be sufficient for most clusters. # Cross-Validation http://git-wip-us.apache.org/repos/asf/spark/blob/16c4c03c/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java index 975c65e..d973279 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java @@ -94,7 +94,9 @@ public class JavaModelSelectionViaCrossValidationExample { CrossValidator cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator()) - .setEstimatorParamMaps(paramGrid).setNumFolds(2); // Use 3+ in practice + .setEstimatorParamMaps(paramGrid) + .setNumFolds(2) // Use 3+ in practice + .setParallelism(2); // Evaluate up to 2 parameter settings in parallel // Run cross-validation, and choose the best set of parameters. CrossValidatorModel cvModel = cv.fit(training);
spark git commit: [SPARK-21924][DOCS] Update structured streaming programming guide doc
Repository: spark Updated Branches: refs/heads/branch-2.2 7da8fbf08 -> 9afab9a52 [SPARK-21924][DOCS] Update structured streaming programming guide doc ## What changes were proposed in this pull request? Update the line "For example, the data (12:09, cat) is out of order and late, and it falls in windows 12:05 - 12:15 and 12:10 - 12:20." as follow "For example, the data (12:09, cat) is out of order and late, and it falls in windows 12:00 - 12:10 and 12:05 - 12:15." under the programming structured streaming programming guide. Author: Riccardo CorbellaCloses #19137 from riccardocorbella/bugfix. (cherry picked from commit 4ee7dfe41b27abbd4c32074ecc8f268f6193c3f4) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9afab9a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9afab9a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9afab9a5 Branch: refs/heads/branch-2.2 Commit: 9afab9a524c287a5c87c0ff54e5c1b757b32747c Parents: 7da8fbf Author: Riccardo Corbella Authored: Wed Sep 6 08:22:57 2017 +0100 Committer: Sean Owen Committed: Wed Sep 6 08:23:10 2017 +0100 -- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9afab9a5/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 13a6a82..93bef8d 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -977,7 +977,7 @@ at the beginning of every trigger is the red line For example, when the engine `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`. This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in -windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in +windows `12:00 - 12:10` and `12:05 - 12:15`. Since, it is still ahead of the watermark `12:04` in the trigger, the engine still maintains the intermediate counts as state and correctly updates the counts of the related windows. However, when the watermark is updated to `12:11`, the intermediate state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21924][DOCS] Update structured streaming programming guide doc
Repository: spark Updated Branches: refs/heads/master 445f1790a -> 4ee7dfe41 [SPARK-21924][DOCS] Update structured streaming programming guide doc ## What changes were proposed in this pull request? Update the line "For example, the data (12:09, cat) is out of order and late, and it falls in windows 12:05 - 12:15 and 12:10 - 12:20." as follow "For example, the data (12:09, cat) is out of order and late, and it falls in windows 12:00 - 12:10 and 12:05 - 12:15." under the programming structured streaming programming guide. Author: Riccardo CorbellaCloses #19137 from riccardocorbella/bugfix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ee7dfe4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ee7dfe4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ee7dfe4 Branch: refs/heads/master Commit: 4ee7dfe41b27abbd4c32074ecc8f268f6193c3f4 Parents: 445f179 Author: Riccardo Corbella Authored: Wed Sep 6 08:22:57 2017 +0100 Committer: Sean Owen Committed: Wed Sep 6 08:22:57 2017 +0100 -- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4ee7dfe4/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 13a6a82..93bef8d 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -977,7 +977,7 @@ at the beginning of every trigger is the red line For example, when the engine `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`. This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in -windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in +windows `12:00 - 12:10` and `12:05 - 12:15`. Since, it is still ahead of the watermark `12:04` in the trigger, the engine still maintains the intermediate counts as state and correctly updates the counts of the related windows. However, when the watermark is updated to `12:11`, the intermediate state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org