[jira] [Commented] (SPARK-24719) ClusteringEvaluator supports integer type labels
[ https://issues.apache.org/jira/browse/SPARK-24719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566425#comment-16566425 ] Xiangrui Meng commented on SPARK-24719: --- [~mgaido] Sorry it was my bad! The error happened when a user tried BisectingKMeans with ClusterEvaluator and CrossValidator. So I reported the issue here. The error was actually from DoubleParam instead of the evaluator. I'll do some investigation. Closing this ticket for now. Thanks for taking a look! > ClusteringEvaluator supports integer type labels > > > Key: SPARK-24719 > URL: https://issues.apache.org/jira/browse/SPARK-24719 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.3.1 >Reporter: Xiangrui Meng >Priority: Major > > ClusterEvaluator should support integer labels because we output integer > labels in BisectingKMeans. > [https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala#L77]. > We should cast numeric types to double in ClusteringEvaluator. > [~mgaido] Do you have time to work on the fix? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24719) ClusteringEvaluator supports integer type labels
[ https://issues.apache.org/jira/browse/SPARK-24719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng resolved SPARK-24719. --- Resolution: Not A Problem > ClusteringEvaluator supports integer type labels > > > Key: SPARK-24719 > URL: https://issues.apache.org/jira/browse/SPARK-24719 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.3.1 >Reporter: Xiangrui Meng >Priority: Major > > ClusterEvaluator should support integer labels because we output integer > labels in BisectingKMeans. > [https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala#L77]. > We should cast numeric types to double in ClusteringEvaluator. > [~mgaido] Do you have time to work on the fix? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24997) Support MINUS ALL
[ https://issues.apache.org/jira/browse/SPARK-24997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24997: Assignee: Apache Spark > Support MINUS ALL > - > > Key: SPARK-24997 > URL: https://issues.apache.org/jira/browse/SPARK-24997 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Dilip Biswal >Assignee: Apache Spark >Priority: Major > > MINUS is synonym for EXCEPT. We have added support for EXCEPT ALL. We need to > enable support for MINUS ALL as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24997) Support MINUS ALL
[ https://issues.apache.org/jira/browse/SPARK-24997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24997: Assignee: (was: Apache Spark) > Support MINUS ALL > - > > Key: SPARK-24997 > URL: https://issues.apache.org/jira/browse/SPARK-24997 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Dilip Biswal >Priority: Major > > MINUS is synonym for EXCEPT. We have added support for EXCEPT ALL. We need to > enable support for MINUS ALL as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24997) Support MINUS ALL
[ https://issues.apache.org/jira/browse/SPARK-24997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566420#comment-16566420 ] Apache Spark commented on SPARK-24997: -- User 'dilipbiswal' has created a pull request for this issue: https://github.com/apache/spark/pull/21963 > Support MINUS ALL > - > > Key: SPARK-24997 > URL: https://issues.apache.org/jira/browse/SPARK-24997 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Dilip Biswal >Priority: Major > > MINUS is synonym for EXCEPT. We have added support for EXCEPT ALL. We need to > enable support for MINUS ALL as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24997) Support MINUS ALL
Dilip Biswal created SPARK-24997: Summary: Support MINUS ALL Key: SPARK-24997 URL: https://issues.apache.org/jira/browse/SPARK-24997 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.1 Reporter: Dilip Biswal MINUS is synonym for EXCEPT. We have added support for EXCEPT ALL. We need to enable support for MINUS ALL as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24557) ClusteringEvaluator support array input
[ https://issues.apache.org/jira/browse/SPARK-24557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng resolved SPARK-24557. --- Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21563 [https://github.com/apache/spark/pull/21563] > ClusteringEvaluator support array input > --- > > Key: SPARK-24557 > URL: https://issues.apache.org/jira/browse/SPARK-24557 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.4.0 >Reporter: zhengruifeng >Assignee: zhengruifeng >Priority: Major > Fix For: 2.4.0 > > > Since clustering algs already suppot array input, > {{{ClusteringEvaluator}}} should also support it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24557) ClusteringEvaluator support array input
[ https://issues.apache.org/jira/browse/SPARK-24557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiangrui Meng reassigned SPARK-24557: - Assignee: zhengruifeng > ClusteringEvaluator support array input > --- > > Key: SPARK-24557 > URL: https://issues.apache.org/jira/browse/SPARK-24557 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.4.0 >Reporter: zhengruifeng >Assignee: zhengruifeng >Priority: Major > Fix For: 2.4.0 > > > Since clustering algs already suppot array input, > {{{ClusteringEvaluator}}} should also support it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24788) RelationalGroupedDataset.toString throws errors when grouping by UnresolvedAttribute
[ https://issues.apache.org/jira/browse/SPARK-24788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566402#comment-16566402 ] Apache Spark commented on SPARK-24788: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/21964 > RelationalGroupedDataset.toString throws errors when grouping by > UnresolvedAttribute > > > Key: SPARK-24788 > URL: https://issues.apache.org/jira/browse/SPARK-24788 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: Chris Horn >Priority: Minor > > This causes references to the RelationalGroupedDataset to break on the shell > because of the toString call: > {code:java} > scala> spark.range(0, 10).groupBy("id") > res4: org.apache.spark.sql.RelationalGroupedDataset = > RelationalGroupedDataset: [grouping expressions: [id: bigint], value: [id: > bigint], type: GroupBy] > scala> spark.range(0, 10).groupBy('id) > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to > dataType on unresolved object, tree: 'id > at > org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) > at > org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:474) > at > org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:473) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.RelationalGroupedDataset.toString(RelationalGroupedDataset.scala:473) > at > scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332) > at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337) > at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345) > {code} > > I will create a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21274) Implement EXCEPT ALL and INTERSECT ALL
[ https://issues.apache.org/jira/browse/SPARK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566399#comment-16566399 ] Apache Spark commented on SPARK-21274: -- User 'dilipbiswal' has created a pull request for this issue: https://github.com/apache/spark/pull/21963 > Implement EXCEPT ALL and INTERSECT ALL > -- > > Key: SPARK-21274 > URL: https://issues.apache.org/jira/browse/SPARK-21274 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.0.0, 2.1.0, 2.2.0 >Reporter: Ruslan Dautkhanov >Assignee: Dilip Biswal >Priority: Major > Fix For: 2.4.0 > > > 1) *EXCEPT ALL* / MINUS ALL : > {code} > SELECT a,b,c FROM tab1 > EXCEPT ALL > SELECT a,b,c FROM tab2 > {code} > can be rewritten as following outer join: > {code} > SELECT a,b,c > FROMtab1 t1 > LEFT OUTER JOIN > tab2 t2 > ON ( > (t1.a, t1.b, t1.c) = (t2.a, t2.b, t2.c) > ) > WHERE > COALESCE(t2.a, t2.b, t2.c) IS NULL > {code} > (register as a temp.view this second query under "*t1_except_t2_df*" name > that can be also used to find INTERSECT ALL below): > 2) *INTERSECT ALL*: > {code} > SELECT a,b,c FROM tab1 > INTERSECT ALL > SELECT a,b,c FROM tab2 > {code} > can be rewritten as following anti-join using t1_except_t2_df we defined > above: > {code} > SELECT a,b,c > FROMtab1 t1 > WHERE >NOT EXISTS >(SELECT 1 > FROMt1_except_t2_df e > WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c) >) > {code} > So the suggestion is just to use above query rewrites to implement both > EXCEPT ALL and INTERSECT ALL sql set operations. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24865) Remove AnalysisBarrier
[ https://issues.apache.org/jira/browse/SPARK-24865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566397#comment-16566397 ] Apache Spark commented on SPARK-24865: -- User 'gatorsmile' has created a pull request for this issue: https://github.com/apache/spark/pull/21962 > Remove AnalysisBarrier > -- > > Key: SPARK-24865 > URL: https://issues.apache.org/jira/browse/SPARK-24865 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.3.1 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 2.4.0 > > > AnalysisBarrier was introduced in SPARK-20392 to improve analysis speed > (don't re-analyze nodes that have already been analyzed). > Before AnalysisBarrier, we already had some infrastructure in place, with > analysis specific functions (resolveOperators and resolveExpressions). These > functions do not recursively traverse down subplans that are already analyzed > (with a mutable boolean flag _analyzed). The issue with the old system was > that developers started using transformDown, which does a top-down traversal > of the plan tree, because there was not top-down resolution function, and as > a result analyzer performance became pretty bad. > In order to fix the issue in SPARK-20392, AnalysisBarrier was introduced as a > special node and for this special node, transform/transformUp/transformDown > don't traverse down. However, the introduction of this special node caused a > lot more troubles than it solves. This implicit node breaks assumptions and > code in a few places, and it's hard to know when analysis barrier would > exist, and when it wouldn't. Just a simple search of AnalysisBarrier in PR > discussions demonstrates it is a source of bugs and additional complexity. > Instead, I think a much simpler fix to the original issue is to introduce > resolveOperatorsDown, and change all places that call transformDown in the > analyzer to use that. We can also ban accidental uses of the various > transform* methods by using a linter (which can only lint specific packages), > or in test mode inspect the stack trace and fail explicitly if transform* are > called in the analyzer. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24996) Use DSL to simplify DeclarativeAggregate
[ https://issues.apache.org/jira/browse/SPARK-24996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-24996: Labels: beginner (was: ) > Use DSL to simplify DeclarativeAggregate > > > Key: SPARK-24996 > URL: https://issues.apache.org/jira/browse/SPARK-24996 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Priority: Major > Labels: beginner > > Simplify DeclarativeAggregate by DSL. See the example: > https://github.com/apache/spark/pull/21951 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24996) Use DSL to simplify DeclarativeAggregate
Xiao Li created SPARK-24996: --- Summary: Use DSL to simplify DeclarativeAggregate Key: SPARK-24996 URL: https://issues.apache.org/jira/browse/SPARK-24996 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Xiao Li Simplify DeclarativeAggregate by DSL. See the example: https://github.com/apache/spark/pull/21951 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23698) Spark code contains numerous undefined names in Python 3
[ https://issues.apache.org/jira/browse/SPARK-23698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566372#comment-16566372 ] Apache Spark commented on SPARK-23698: -- User 'cclauss' has created a pull request for this issue: https://github.com/apache/spark/pull/21960 > Spark code contains numerous undefined names in Python 3 > > > Key: SPARK-23698 > URL: https://issues.apache.org/jira/browse/SPARK-23698 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 >Reporter: cclauss >Assignee: cclauss >Priority: Minor > Fix For: 2.4.0 > > > flake8 testing of https://github.com/apache/spark on Python 3.6.3 > $ *flake8 . --count --select=E901,E999,F821,F822,F823 --show-source > --statistics* > ./dev/merge_spark_pr.py:98:14: F821 undefined name 'raw_input' > result = raw_input("\n%s (y/n): " % prompt) > ^ > ./dev/merge_spark_pr.py:136:22: F821 undefined name 'raw_input' > primary_author = raw_input( > ^ > ./dev/merge_spark_pr.py:186:16: F821 undefined name 'raw_input' > pick_ref = raw_input("Enter a branch name [%s]: " % default_branch) >^ > ./dev/merge_spark_pr.py:233:15: F821 undefined name 'raw_input' > jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id) > ^ > ./dev/merge_spark_pr.py:278:20: F821 undefined name 'raw_input' > fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % > default_fix_versions) >^ > ./dev/merge_spark_pr.py:317:28: F821 undefined name 'raw_input' > raw_assignee = raw_input( >^ > ./dev/merge_spark_pr.py:430:14: F821 undefined name 'raw_input' > pr_num = raw_input("Which pull request would you like to merge? (e.g. > 34): ") > ^ > ./dev/merge_spark_pr.py:442:18: F821 undefined name 'raw_input' > result = raw_input("Would you like to use the modified title? (y/n): > ") > ^ > ./dev/merge_spark_pr.py:493:11: F821 undefined name 'raw_input' > while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y": > ^ > ./dev/create-release/releaseutils.py:58:16: F821 undefined name 'raw_input' > response = raw_input("%s [y/n]: " % msg) >^ > ./dev/create-release/releaseutils.py:152:38: F821 undefined name 'unicode' > author = unidecode.unidecode(unicode(author, "UTF-8")).strip() > ^ > ./python/setup.py:37:11: F821 undefined name '__version__' > VERSION = __version__ > ^ > ./python/pyspark/cloudpickle.py:275:18: F821 undefined name 'buffer' > dispatch[buffer] = save_buffer > ^ > ./python/pyspark/cloudpickle.py:807:18: F821 undefined name 'file' > dispatch[file] = save_file > ^ > ./python/pyspark/sql/conf.py:61:61: F821 undefined name 'unicode' > if not isinstance(obj, str) and not isinstance(obj, unicode): > ^ > ./python/pyspark/sql/streaming.py:25:21: F821 undefined name 'long' > intlike = (int, long) > ^ > ./python/pyspark/streaming/dstream.py:405:35: F821 undefined name 'long' > return self._sc._jvm.Time(long(timestamp * 1000)) > ^ > ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:21:10: F821 > undefined name 'xrange' > for i in xrange(50): > ^ > ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:22:14: F821 > undefined name 'xrange' > for j in xrange(5): > ^ > ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:23:18: F821 > undefined name 'xrange' > for k in xrange(20022): > ^ > 20F821 undefined name 'raw_input' > 20 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23698) Spark code contains numerous undefined names in Python 3
[ https://issues.apache.org/jira/browse/SPARK-23698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566356#comment-16566356 ] Apache Spark commented on SPARK-23698: -- User 'cclauss' has created a pull request for this issue: https://github.com/apache/spark/pull/21959 > Spark code contains numerous undefined names in Python 3 > > > Key: SPARK-23698 > URL: https://issues.apache.org/jira/browse/SPARK-23698 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.0 >Reporter: cclauss >Assignee: cclauss >Priority: Minor > Fix For: 2.4.0 > > > flake8 testing of https://github.com/apache/spark on Python 3.6.3 > $ *flake8 . --count --select=E901,E999,F821,F822,F823 --show-source > --statistics* > ./dev/merge_spark_pr.py:98:14: F821 undefined name 'raw_input' > result = raw_input("\n%s (y/n): " % prompt) > ^ > ./dev/merge_spark_pr.py:136:22: F821 undefined name 'raw_input' > primary_author = raw_input( > ^ > ./dev/merge_spark_pr.py:186:16: F821 undefined name 'raw_input' > pick_ref = raw_input("Enter a branch name [%s]: " % default_branch) >^ > ./dev/merge_spark_pr.py:233:15: F821 undefined name 'raw_input' > jira_id = raw_input("Enter a JIRA id [%s]: " % default_jira_id) > ^ > ./dev/merge_spark_pr.py:278:20: F821 undefined name 'raw_input' > fix_versions = raw_input("Enter comma-separated fix version(s) [%s]: " % > default_fix_versions) >^ > ./dev/merge_spark_pr.py:317:28: F821 undefined name 'raw_input' > raw_assignee = raw_input( >^ > ./dev/merge_spark_pr.py:430:14: F821 undefined name 'raw_input' > pr_num = raw_input("Which pull request would you like to merge? (e.g. > 34): ") > ^ > ./dev/merge_spark_pr.py:442:18: F821 undefined name 'raw_input' > result = raw_input("Would you like to use the modified title? (y/n): > ") > ^ > ./dev/merge_spark_pr.py:493:11: F821 undefined name 'raw_input' > while raw_input("\n%s (y/n): " % pick_prompt).lower() == "y": > ^ > ./dev/create-release/releaseutils.py:58:16: F821 undefined name 'raw_input' > response = raw_input("%s [y/n]: " % msg) >^ > ./dev/create-release/releaseutils.py:152:38: F821 undefined name 'unicode' > author = unidecode.unidecode(unicode(author, "UTF-8")).strip() > ^ > ./python/setup.py:37:11: F821 undefined name '__version__' > VERSION = __version__ > ^ > ./python/pyspark/cloudpickle.py:275:18: F821 undefined name 'buffer' > dispatch[buffer] = save_buffer > ^ > ./python/pyspark/cloudpickle.py:807:18: F821 undefined name 'file' > dispatch[file] = save_file > ^ > ./python/pyspark/sql/conf.py:61:61: F821 undefined name 'unicode' > if not isinstance(obj, str) and not isinstance(obj, unicode): > ^ > ./python/pyspark/sql/streaming.py:25:21: F821 undefined name 'long' > intlike = (int, long) > ^ > ./python/pyspark/streaming/dstream.py:405:35: F821 undefined name 'long' > return self._sc._jvm.Time(long(timestamp * 1000)) > ^ > ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:21:10: F821 > undefined name 'xrange' > for i in xrange(50): > ^ > ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:22:14: F821 > undefined name 'xrange' > for j in xrange(5): > ^ > ./sql/hive/src/test/resources/data/scripts/dumpdata_script.py:23:18: F821 > undefined name 'xrange' > for k in xrange(20022): > ^ > 20F821 undefined name 'raw_input' > 20 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24994) When the data type of the field is converted to other types, it can also support pushdown to parquet
[ https://issues.apache.org/jira/browse/SPARK-24994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24994: Assignee: Apache Spark > When the data type of the field is converted to other types, it can also > support pushdown to parquet > > > Key: SPARK-24994 > URL: https://issues.apache.org/jira/browse/SPARK-24994 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: liuxian >Assignee: Apache Spark >Priority: Major > > For this statement: select * from table1 where a = 100; > the data type of `a` is `smallint` , because the defaut data type of 100 is > `int` ,so the data type of 'a' is converted to `int`. > In this case, it does not support push down to parquet. > In our business, for our SQL statements, and we generally do not convert 100 > to `smallint`, We hope that it can support push down to parquet for this > situation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24994) When the data type of the field is converted to other types, it can also support pushdown to parquet
[ https://issues.apache.org/jira/browse/SPARK-24994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24994: Assignee: (was: Apache Spark) > When the data type of the field is converted to other types, it can also > support pushdown to parquet > > > Key: SPARK-24994 > URL: https://issues.apache.org/jira/browse/SPARK-24994 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: liuxian >Priority: Major > > For this statement: select * from table1 where a = 100; > the data type of `a` is `smallint` , because the defaut data type of 100 is > `int` ,so the data type of 'a' is converted to `int`. > In this case, it does not support push down to parquet. > In our business, for our SQL statements, and we generally do not convert 100 > to `smallint`, We hope that it can support push down to parquet for this > situation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24994) When the data type of the field is converted to other types, it can also support pushdown to parquet
[ https://issues.apache.org/jira/browse/SPARK-24994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566336#comment-16566336 ] Apache Spark commented on SPARK-24994: -- User '10110346' has created a pull request for this issue: https://github.com/apache/spark/pull/21957 > When the data type of the field is converted to other types, it can also > support pushdown to parquet > > > Key: SPARK-24994 > URL: https://issues.apache.org/jira/browse/SPARK-24994 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: liuxian >Priority: Major > > For this statement: select * from table1 where a = 100; > the data type of `a` is `smallint` , because the defaut data type of 100 is > `int` ,so the data type of 'a' is converted to `int`. > In this case, it does not support push down to parquet. > In our business, for our SQL statements, and we generally do not convert 100 > to `smallint`, We hope that it can support push down to parquet for this > situation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24995) Flaky tests: FlatMapGroupsWithStateSuite.flatMapGroupsWithState - streaming with processing time timeout
Jungtaek Lim created SPARK-24995: Summary: Flaky tests: FlatMapGroupsWithStateSuite.flatMapGroupsWithState - streaming with processing time timeout Key: SPARK-24995 URL: https://issues.apache.org/jira/browse/SPARK-24995 Project: Spark Issue Type: Bug Components: Structured Streaming, Tests Affects Versions: 2.4.0 Reporter: Jungtaek Lim I've seen CI build failure often, and noticed some of failures came from FlatMapGroupsWithStateSuite.flatMapGroupsWithState - streaming with processing time timeout. For now I'm only having build links for failures with state 2 but I'm sure I saw the failure with state 1, too. [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93906/testReport/org.apache.spark.sql.streaming/FlatMapGroupsWithStateSuite/flatMapGroupsWithState___streaming_with_processing_time_timeout___state_format_version_2/] [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93874/testReport/org.apache.spark.sql.streaming/FlatMapGroupsWithStateSuite/flatMapGroupsWithState___streaming_with_processing_time_timeout___state_format_version_2/] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23742) Filter out redundant AssociationRules
[ https://issues.apache.org/jira/browse/SPARK-23742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566326#comment-16566326 ] yuhao yang commented on SPARK-23742: [~maropu] Can you be more specific about the suggestion? E.g. how would it work with the example in the description. Thanks > Filter out redundant AssociationRules > - > > Key: SPARK-23742 > URL: https://issues.apache.org/jira/browse/SPARK-23742 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.3.0 >Reporter: Joseph K. Bradley >Priority: Major > > AssociationRules can generate redundant rules such as: > * (A) => C > * (A,B) => C (redundant) > It should optionally filter out redundant rules. It'd be nice to have it > optional (but maybe defaulting to filtering) so that users could compare the > confidences of more general vs. more specific rules. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24994) When the data type of the field is converted to other types, it can also support pushdown to parquet
liuxian created SPARK-24994: --- Summary: When the data type of the field is converted to other types, it can also support pushdown to parquet Key: SPARK-24994 URL: https://issues.apache.org/jira/browse/SPARK-24994 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: liuxian For this statement: select * from table1 where a = 100; the data type of `a` is `smallint` , because the defaut data type of 100 is `int` ,so the data type of 'a' is converted to `int`. In this case, it does not support push down to parquet. In our business, for our SQL statements, and we generally do not convert 100 to `smallint`, We hope that it can support push down to parquet for this situation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 0.10.0.1 to 2.0.0
[ https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566300#comment-16566300 ] Apache Spark commented on SPARK-18057: -- User 'srowen' has created a pull request for this issue: https://github.com/apache/spark/pull/21955 > Update structured streaming kafka from 0.10.0.1 to 2.0.0 > > > Key: SPARK-18057 > URL: https://issues.apache.org/jira/browse/SPARK-18057 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Reporter: Cody Koeninger >Assignee: Ted Yu >Priority: Major > Fix For: 2.4.0 > > > There are a couple of relevant KIPs here, > https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23908) High-order function: transform(array, function) → array
[ https://issues.apache.org/jira/browse/SPARK-23908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566293#comment-16566293 ] Apache Spark commented on SPARK-23908: -- User 'ueshin' has created a pull request for this issue: https://github.com/apache/spark/pull/21954 > High-order function: transform(array, function) → array > --- > > Key: SPARK-23908 > URL: https://issues.apache.org/jira/browse/SPARK-23908 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Assignee: Herman van Hovell >Priority: Major > > Ref: https://prestodb.io/docs/current/functions/array.html > Returns an array that is the result of applying function to each element of > array: > {noformat} > SELECT transform(ARRAY [], x -> x + 1); -- [] > SELECT transform(ARRAY [5, 6], x -> x + 1); -- [6, 7] > SELECT transform(ARRAY [5, NULL, 6], x -> COALESCE(x, 0) + 1); -- [6, 1, 7] > SELECT transform(ARRAY ['x', 'abc', 'z'], x -> x || '0'); -- ['x0', 'abc0', > 'z0'] > SELECT transform(ARRAY [ARRAY [1, NULL, 2], ARRAY[3, NULL]], a -> filter(a, x > -> x IS NOT NULL)); -- [[1, 2], [3]] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24817) Implement BarrierTaskContext.barrier()
[ https://issues.apache.org/jira/browse/SPARK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566278#comment-16566278 ] Jiang Xingbo commented on SPARK-24817: -- Actually the current implementation of _barrier_ function doesn't requires communications between executors, all executors just talk to a _BarrierCoordinator_ which is in the driver. But to allow launching ML workloads we do need to enable executors to communicate with each other directly, IIUC that shall be investigated under SPARK-24724 . Maybe [~mengxr] can provide more context here. > Implement BarrierTaskContext.barrier() > -- > > Key: SPARK-24817 > URL: https://issues.apache.org/jira/browse/SPARK-24817 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > Implement BarrierTaskContext.barrier(), to support global sync between all > the tasks in a barrier stage. The global sync shall finish immediately once > all tasks in the same barrier stage reaches the same barrier. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23742) Filter out redundant AssociationRules
[ https://issues.apache.org/jira/browse/SPARK-23742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566266#comment-16566266 ] Takeshi Yamamuro commented on SPARK-23742: -- Can't we control this case by a new config implemented in SPARK-24802? > Filter out redundant AssociationRules > - > > Key: SPARK-23742 > URL: https://issues.apache.org/jira/browse/SPARK-23742 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.3.0 >Reporter: Joseph K. Bradley >Priority: Major > > AssociationRules can generate redundant rules such as: > * (A) => C > * (A,B) => C (redundant) > It should optionally filter out redundant rules. It'd be nice to have it > optional (but maybe defaulting to filtering) so that users could compare the > confidences of more general vs. more specific rules. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24992) spark should randomize yarn local dir selection
[ https://issues.apache.org/jira/browse/SPARK-24992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24992: Assignee: Apache Spark > spark should randomize yarn local dir selection > --- > > Key: SPARK-24992 > URL: https://issues.apache.org/jira/browse/SPARK-24992 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Hieu Tri Huynh >Assignee: Apache Spark >Priority: Minor > > Utils.getLocalDir is used to get path of a temporary directory. However, it > always returns the the same directory, which is the first element in the > array _localRootDirs_. When running on YARN, this might causes the case that > we always write to one disk, which makes it busy while other disks are free. > We should randomize the selection to spread out the loads. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24992) spark should randomize yarn local dir selection
[ https://issues.apache.org/jira/browse/SPARK-24992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24992: Assignee: (was: Apache Spark) > spark should randomize yarn local dir selection > --- > > Key: SPARK-24992 > URL: https://issues.apache.org/jira/browse/SPARK-24992 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Hieu Tri Huynh >Priority: Minor > > Utils.getLocalDir is used to get path of a temporary directory. However, it > always returns the the same directory, which is the first element in the > array _localRootDirs_. When running on YARN, this might causes the case that > we always write to one disk, which makes it busy while other disks are free. > We should randomize the selection to spread out the loads. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24992) spark should randomize yarn local dir selection
[ https://issues.apache.org/jira/browse/SPARK-24992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566249#comment-16566249 ] Apache Spark commented on SPARK-24992: -- User 'hthuynh2' has created a pull request for this issue: https://github.com/apache/spark/pull/21953 > spark should randomize yarn local dir selection > --- > > Key: SPARK-24992 > URL: https://issues.apache.org/jira/browse/SPARK-24992 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Hieu Tri Huynh >Priority: Minor > > Utils.getLocalDir is used to get path of a temporary directory. However, it > always returns the the same directory, which is the first element in the > array _localRootDirs_. When running on YARN, this might causes the case that > we always write to one disk, which makes it busy while other disks are free. > We should randomize the selection to spread out the loads. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24993) Make Avro fast again
[ https://issues.apache.org/jira/browse/SPARK-24993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566222#comment-16566222 ] Apache Spark commented on SPARK-24993: -- User 'dbtsai' has created a pull request for this issue: https://github.com/apache/spark/pull/21952 > Make Avro fast again > > > Key: SPARK-24993 > URL: https://issues.apache.org/jira/browse/SPARK-24993 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: DB Tsai >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24993) Make Avro fast again
[ https://issues.apache.org/jira/browse/SPARK-24993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24993: Assignee: Apache Spark > Make Avro fast again > > > Key: SPARK-24993 > URL: https://issues.apache.org/jira/browse/SPARK-24993 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: DB Tsai >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24993) Make Avro fast again
[ https://issues.apache.org/jira/browse/SPARK-24993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24993: Assignee: (was: Apache Spark) > Make Avro fast again > > > Key: SPARK-24993 > URL: https://issues.apache.org/jira/browse/SPARK-24993 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: DB Tsai >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24993) Make Avro fast again
DB Tsai created SPARK-24993: --- Summary: Make Avro fast again Key: SPARK-24993 URL: https://issues.apache.org/jira/browse/SPARK-24993 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: DB Tsai -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen
[ https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566177#comment-16566177 ] Apache Spark commented on SPARK-24957: -- User 'gatorsmile' has created a pull request for this issue: https://github.com/apache/spark/pull/21951 > Decimal arithmetic can lead to wrong values using codegen > - > > Key: SPARK-24957 > URL: https://issues.apache.org/jira/browse/SPARK-24957 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: David Vogelbacher >Assignee: Marco Gaido >Priority: Major > Labels: correctness > Fix For: 2.2.3, 2.3.2, 2.4.0 > > > I noticed a bug when doing arithmetic on a dataframe containing decimal > values with codegen enabled. > I tried to narrow it down on a small repro and got this (executed in > spark-shell): > {noformat} > scala> val df = Seq( > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("11.88")) > | ).toDF("text", "number") > df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] > scala> val df_grouped_1 = > df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) > df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_1.collect() > res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) > scala> val df_grouped_2 = > df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) > df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_2.collect() > res1: Array[org.apache.spark.sql.Row] = > Array([a,11948571.4285714285714285714286]) > scala> val df_total_sum = > df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) > df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] > scala> df_total_sum.collect() > res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) > {noformat} > The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the > result of {{df_grouped_2}} is clearly incorrect (it is the value of the > correct result times {{10^14}}). > When codegen is disabled all results are correct. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24817) Implement BarrierTaskContext.barrier()
[ https://issues.apache.org/jira/browse/SPARK-24817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566159#comment-16566159 ] Erik Erlandson commented on SPARK-24817: I'm curious about what the {{barrier}} invocations inside {{mapPartitions}} closures imply about communications between executors, for example executors running on pods in a kube cluster. It is possible that whatever is allowing shuffle data to transfer between executors will also allow these {{barrier}} coordinations to work, but we had to create a headless service for executors to register properly with the driver pod, and if every executor pod needs something like that for barrier to work, it will be an impact for kube backend support. > Implement BarrierTaskContext.barrier() > -- > > Key: SPARK-24817 > URL: https://issues.apache.org/jira/browse/SPARK-24817 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > Implement BarrierTaskContext.barrier(), to support global sync between all > the tasks in a barrier stage. The global sync shall finish immediately once > all tasks in the same barrier stage reaches the same barrier. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24580) List scenarios to be handled by barrier execution mode properly
[ https://issues.apache.org/jira/browse/SPARK-24580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566154#comment-16566154 ] Erik Erlandson commented on SPARK-24580: This is blocking SPARK-24582 which is marked as 'resolved' but it appears to be inactive. > List scenarios to be handled by barrier execution mode properly > --- > > Key: SPARK-24580 > URL: https://issues.apache.org/jira/browse/SPARK-24580 > Project: Spark > Issue Type: Story > Components: ML, Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > List scenarios to be handled by barrier execution mode to help the design. We > will start with simple ones to complex. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen
[ https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-24957: Fix Version/s: 2.2.3 > Decimal arithmetic can lead to wrong values using codegen > - > > Key: SPARK-24957 > URL: https://issues.apache.org/jira/browse/SPARK-24957 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: David Vogelbacher >Assignee: Marco Gaido >Priority: Major > Labels: correctness > Fix For: 2.2.3, 2.3.2, 2.4.0 > > > I noticed a bug when doing arithmetic on a dataframe containing decimal > values with codegen enabled. > I tried to narrow it down on a small repro and got this (executed in > spark-shell): > {noformat} > scala> val df = Seq( > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("11.88")) > | ).toDF("text", "number") > df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] > scala> val df_grouped_1 = > df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) > df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_1.collect() > res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) > scala> val df_grouped_2 = > df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) > df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_2.collect() > res1: Array[org.apache.spark.sql.Row] = > Array([a,11948571.4285714285714285714286]) > scala> val df_total_sum = > df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) > df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] > scala> df_total_sum.collect() > res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) > {noformat} > The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the > result of {{df_grouped_2}} is clearly incorrect (it is the value of the > correct result times {{10^14}}). > When codegen is disabled all results are correct. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24990) merge ReadSupport and ReadSupportWithSchema
[ https://issues.apache.org/jira/browse/SPARK-24990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-24990. - Resolution: Fixed Fix Version/s: 2.4.0 > merge ReadSupport and ReadSupportWithSchema > --- > > Key: SPARK-24990 > URL: https://issues.apache.org/jira/browse/SPARK-24990 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 2.4.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24914) totalSize is not a good estimate for broadcast joins
[ https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24914: Assignee: Apache Spark > totalSize is not a good estimate for broadcast joins > > > Key: SPARK-24914 > URL: https://issues.apache.org/jira/browse/SPARK-24914 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bruce Robbins >Assignee: Apache Spark >Priority: Major > > When determining whether to do a broadcast join, Spark estimates the size of > the smaller table as follows: > - if totalSize is defined and greater than 0, use it. > - else, if rawDataSize is defined and greater than 0, use it > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Therefore, Spark prefers totalSize over rawDataSize. > Unfortunately, totalSize is often quite a bit smaller than the actual table > size, since it represents the size of the table's files on disk. Parquet and > Orc files, for example, are encoded and compressed. This can result in the > JVM throwing an OutOfMemoryError while Spark is loading the table into a > HashedRelation, or when Spark actually attempts to broadcast the data. > On the other hand, rawDataSize represents the uncompressed size of the > dataset, according to Hive documentation. This seems like a pretty good > number to use in preference to totalSize. However, due to HIVE-20079, this > value is simply #columns * #rows. Once that bug is fixed, it may be a > superior statistic, at least for managed tables. > In the meantime, we could apply a configurable "fudge factor" to totalSize, > at least for types of files that are encoded and compressed. Hive has the > setting hive.stats.deserialization.factor, which defaults to 1.0, and is > described as follows: > {quote}in the absence of uncompressed/raw data size, total file size will be > used for statistics annotation. But the file may be compressed, encoded and > serialized which may be lesser in size than the actual uncompressed/raw data > size. This factor will be multiplied to file size to estimate the raw data > size. > {quote} > Also, I propose a configuration setting to allow the user to completely > ignore rawDataSize, since that value is broken (due to HIVE-20079). When that > configuration setting is set to true, Spark would instead estimate the table > as follows: > - if totalSize is defined and greater than 0, use totalSize*fudgeFactor. > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Caveat: This mitigates the issue only for Hive tables. It does not help much > when the user is reading files using {{spark.read.parquet}}, unless we apply > the same fudge factor there. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24914) totalSize is not a good estimate for broadcast joins
[ https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566105#comment-16566105 ] Apache Spark commented on SPARK-24914: -- User 'bersprockets' has created a pull request for this issue: https://github.com/apache/spark/pull/21950 > totalSize is not a good estimate for broadcast joins > > > Key: SPARK-24914 > URL: https://issues.apache.org/jira/browse/SPARK-24914 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bruce Robbins >Priority: Major > > When determining whether to do a broadcast join, Spark estimates the size of > the smaller table as follows: > - if totalSize is defined and greater than 0, use it. > - else, if rawDataSize is defined and greater than 0, use it > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Therefore, Spark prefers totalSize over rawDataSize. > Unfortunately, totalSize is often quite a bit smaller than the actual table > size, since it represents the size of the table's files on disk. Parquet and > Orc files, for example, are encoded and compressed. This can result in the > JVM throwing an OutOfMemoryError while Spark is loading the table into a > HashedRelation, or when Spark actually attempts to broadcast the data. > On the other hand, rawDataSize represents the uncompressed size of the > dataset, according to Hive documentation. This seems like a pretty good > number to use in preference to totalSize. However, due to HIVE-20079, this > value is simply #columns * #rows. Once that bug is fixed, it may be a > superior statistic, at least for managed tables. > In the meantime, we could apply a configurable "fudge factor" to totalSize, > at least for types of files that are encoded and compressed. Hive has the > setting hive.stats.deserialization.factor, which defaults to 1.0, and is > described as follows: > {quote}in the absence of uncompressed/raw data size, total file size will be > used for statistics annotation. But the file may be compressed, encoded and > serialized which may be lesser in size than the actual uncompressed/raw data > size. This factor will be multiplied to file size to estimate the raw data > size. > {quote} > Also, I propose a configuration setting to allow the user to completely > ignore rawDataSize, since that value is broken (due to HIVE-20079). When that > configuration setting is set to true, Spark would instead estimate the table > as follows: > - if totalSize is defined and greater than 0, use totalSize*fudgeFactor. > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Caveat: This mitigates the issue only for Hive tables. It does not help much > when the user is reading files using {{spark.read.parquet}}, unless we apply > the same fudge factor there. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24914) totalSize is not a good estimate for broadcast joins
[ https://issues.apache.org/jira/browse/SPARK-24914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24914: Assignee: (was: Apache Spark) > totalSize is not a good estimate for broadcast joins > > > Key: SPARK-24914 > URL: https://issues.apache.org/jira/browse/SPARK-24914 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bruce Robbins >Priority: Major > > When determining whether to do a broadcast join, Spark estimates the size of > the smaller table as follows: > - if totalSize is defined and greater than 0, use it. > - else, if rawDataSize is defined and greater than 0, use it > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Therefore, Spark prefers totalSize over rawDataSize. > Unfortunately, totalSize is often quite a bit smaller than the actual table > size, since it represents the size of the table's files on disk. Parquet and > Orc files, for example, are encoded and compressed. This can result in the > JVM throwing an OutOfMemoryError while Spark is loading the table into a > HashedRelation, or when Spark actually attempts to broadcast the data. > On the other hand, rawDataSize represents the uncompressed size of the > dataset, according to Hive documentation. This seems like a pretty good > number to use in preference to totalSize. However, due to HIVE-20079, this > value is simply #columns * #rows. Once that bug is fixed, it may be a > superior statistic, at least for managed tables. > In the meantime, we could apply a configurable "fudge factor" to totalSize, > at least for types of files that are encoded and compressed. Hive has the > setting hive.stats.deserialization.factor, which defaults to 1.0, and is > described as follows: > {quote}in the absence of uncompressed/raw data size, total file size will be > used for statistics annotation. But the file may be compressed, encoded and > serialized which may be lesser in size than the actual uncompressed/raw data > size. This factor will be multiplied to file size to estimate the raw data > size. > {quote} > Also, I propose a configuration setting to allow the user to completely > ignore rawDataSize, since that value is broken (due to HIVE-20079). When that > configuration setting is set to true, Spark would instead estimate the table > as follows: > - if totalSize is defined and greater than 0, use totalSize*fudgeFactor. > - else, use spark.sql.defaultSizeInBytes (default: Long.MaxValue) > Caveat: This mitigates the issue only for Hive tables. It does not help much > when the user is reading files using {{spark.read.parquet}}, unless we apply > the same fudge factor there. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24912) Broadcast join OutOfMemory stack trace obscures actual cause of OOM
[ https://issues.apache.org/jira/browse/SPARK-24912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566097#comment-16566097 ] Apache Spark commented on SPARK-24912: -- User 'bersprockets' has created a pull request for this issue: https://github.com/apache/spark/pull/21950 > Broadcast join OutOfMemory stack trace obscures actual cause of OOM > --- > > Key: SPARK-24912 > URL: https://issues.apache.org/jira/browse/SPARK-24912 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Bruce Robbins >Priority: Minor > > When the Spark driver suffers an OutOfMemoryError while attempting to > broadcast a table for a broadcast join, the resulting stack trace obscures > the actual cause of the OOM. For e.g.: > {noformat} > [GC (Allocation Failure) 585453K->585453K(928768K), 0.0060025 secs] > [Full GC (Allocation Failure) 585453K->582524K(928768K), 0.4019639 secs] > java.lang.OutOfMemoryError: Java heap space > Dumping heap to java_pid12446.hprof ... > Heap dump file created [632701033 bytes in 1.016 secs] > Exception in thread "main" java.lang.OutOfMemoryError: Not enough memory to > build and broadcast the table to all worker nodes. As a workaround, you can > either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to > -1 or increase the spark driver memory by setting spark.driver.memory to a > higher value > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:122) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withExecutionId$1.apply(SQLExecution.scala:101) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:98) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:75) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 18/07/24 14:29:58 INFO ContextCleaner: Cleaned accumulator 30 > 18/07/24 14:29:58 INFO ContextCleaner: Cleaned accumulator 35 > {noformat} > The above stack trace blames BroadcastExchangeExec. However, the given line > is actually where the original OutOfMemoryError was caught and a new one was > created and wrapped by a SparkException. The actual location where the OOM > occurred was in LongToUnsafeRowMap#grow, at this line: > {noformat} > val newPage = new Array[Long](newNumWords.toInt) > {noformat} > Sometimes it is helpful to know the actual location from which an OOM is > thrown. In the above case, the location indicated that Spark underestimated > the size of a large-ish table and ran out of memory trying to load it into > memory. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24992) spark should randomize yarn local dir selection
Hieu Tri Huynh created SPARK-24992: -- Summary: spark should randomize yarn local dir selection Key: SPARK-24992 URL: https://issues.apache.org/jira/browse/SPARK-24992 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.3.1 Reporter: Hieu Tri Huynh Utils.getLocalDir is used to get path of a temporary directory. However, it always returns the the same directory, which is the first element in the array _localRootDirs_. When running on YARN, this might causes the case that we always write to one disk, which makes it busy while other disks are free. We should randomize the selection to spread out the loads. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24960) k8s: explicitly expose ports on driver container
[ https://issues.apache.org/jira/browse/SPARK-24960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Cheah resolved SPARK-24960. Resolution: Fixed Fix Version/s: 2.4.0 > k8s: explicitly expose ports on driver container > > > Key: SPARK-24960 > URL: https://issues.apache.org/jira/browse/SPARK-24960 > Project: Spark > Issue Type: Improvement > Components: Deploy, Kubernetes, Scheduler >Affects Versions: 2.2.0 >Reporter: Adelbert Chang >Priority: Minor > Fix For: 2.4.0 > > > For the Kubernetes scheduler, the Driver Pod does not explicitly expose its > ports. It is possible for a Kubernetes environment to be setup such that Pod > ports are closed by default and must be opened explicitly in the Pod spec. In > such an environment without this improvement the Driver Service will be > unable to route requests (e.g. from the Executors) to the corresponding > Driver Pod, which can be observed on the Executor side with this error > message: > {noformat} > Caused by: java.io.IOException: Failed to connect to > org-apache-spark-examples-sparkpi-1519271450264-driver-svc.dev.svc.cluster.local:7078{noformat} > > For posterity, this is a copy of the [original > issue|https://github.com/apache-spark-on-k8s/spark/issues/617] filed in the > now deprecated {{apache-spark-on-k8s}} repository. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24937) Datasource partition table should load empty static partitions
[ https://issues.apache.org/jira/browse/SPARK-24937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-24937. - Resolution: Fixed Assignee: Yuming Wang Fix Version/s: 2.4.0 > Datasource partition table should load empty static partitions > -- > > Key: SPARK-24937 > URL: https://issues.apache.org/jira/browse/SPARK-24937 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Yuming Wang >Assignee: Yuming Wang >Priority: Major > Fix For: 2.4.0 > > > How to reproduce: > {code:sql} > spark-sql> CREATE TABLE tbl AS SELECT 1; > spark-sql> CREATE TABLE tbl1 (c1 BIGINT, day STRING, hour STRING) > > USING parquet > > PARTITIONED BY (day, hour); > spark-sql> INSERT INTO TABLE tbl1 PARTITION (day = '2018-07-25', hour='01') > SELECT * FROM tbl where 1=0; > spark-sql> SHOW PARTITIONS tbl1; > spark-sql> CREATE TABLE tbl2 (c1 BIGINT) > > PARTITIONED BY (day STRING, hour STRING); > 18/07/26 22:49:20 WARN HiveMetaStore: Location: > file:/Users/yumwang/tmp/spark/spark-warehouse/tbl2 specified for non-external > table:tbl2 > spark-sql> INSERT INTO TABLE tbl2 PARTITION (day = '2018-07-25', hour='01') > SELECT * FROM tbl where 1=0; > 18/07/26 22:49:36 WARN log: Updating partition stats fast for: tbl2 > 18/07/26 22:49:36 WARN log: Updated size to 0 > spark-sql> SHOW PARTITIONS tbl2; > day=2018-07-25/hour=01 > spark-sql> > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24957) Decimal arithmetic can lead to wrong values using codegen
[ https://issues.apache.org/jira/browse/SPARK-24957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565861#comment-16565861 ] Apache Spark commented on SPARK-24957: -- User 'mgaido91' has created a pull request for this issue: https://github.com/apache/spark/pull/21949 > Decimal arithmetic can lead to wrong values using codegen > - > > Key: SPARK-24957 > URL: https://issues.apache.org/jira/browse/SPARK-24957 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.1 >Reporter: David Vogelbacher >Assignee: Marco Gaido >Priority: Major > Labels: correctness > Fix For: 2.3.2, 2.4.0 > > > I noticed a bug when doing arithmetic on a dataframe containing decimal > values with codegen enabled. > I tried to narrow it down on a small repro and got this (executed in > spark-shell): > {noformat} > scala> val df = Seq( > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("12.0")), > | ("a", BigDecimal("11.88")), > | ("a", BigDecimal("11.88")) > | ).toDF("text", "number") > df: org.apache.spark.sql.DataFrame = [text: string, number: decimal(38,18)] > scala> val df_grouped_1 = > df.groupBy(df.col("text")).agg(functions.avg(df.col("number")).as("number")) > df_grouped_1: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_1.collect() > res0: Array[org.apache.spark.sql.Row] = Array([a,11.94857142857143]) > scala> val df_grouped_2 = > df_grouped_1.groupBy(df_grouped_1.col("text")).agg(functions.sum(df_grouped_1.col("number")).as("number")) > df_grouped_2: org.apache.spark.sql.DataFrame = [text: string, number: > decimal(38,22)] > scala> df_grouped_2.collect() > res1: Array[org.apache.spark.sql.Row] = > Array([a,11948571.4285714285714285714286]) > scala> val df_total_sum = > df_grouped_1.agg(functions.sum(df_grouped_1.col("number")).as("number")) > df_total_sum: org.apache.spark.sql.DataFrame = [number: decimal(38,22)] > scala> df_total_sum.collect() > res2: Array[org.apache.spark.sql.Row] = Array([11.94857142857143]) > {noformat} > The results of {{df_grouped_1}} and {{df_total_sum}} are correct, whereas the > result of {{df_grouped_2}} is clearly incorrect (it is the value of the > correct result times {{10^14}}). > When codegen is disabled all results are correct. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24980) add support for pandas/arrow etc for python2.7 and pypy builds
[ https://issues.apache.org/jira/browse/SPARK-24980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565780#comment-16565780 ] shane knapp edited comment on SPARK-24980 at 8/1/18 7:14 PM: - alright, pandas 0.19.2 and pyarrow 0.8.0 are installed for py27 on the ubuntu workers. this doesn't provide full coverage for tests, but here's the list of builds pinned to these workers: [https://amplab.cs.berkeley.edu/jenkins/label/ubuntu/] one of my test builds is currently running, which will show if these tests are being run against pandas and pyarrow w/py27: [https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/893] i'll deal w/the centos updates later. was (Author: shaneknapp): alright, pandas 0.19.2 and pyarrow 0.8.0 are installed for py27 on the ubuntu workers. this doesn't provide full coverage for tests, but here's the list of builds pinned to these workers: [https://amplab.cs.berkeley.edu/jenkins/label/ubuntu/] one of my test builds is currently running, which will show if these tests are being run against pandas and pyarrow w/py27: [ https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/893 |[https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/]893] i'll deal w/the centos updates later. > add support for pandas/arrow etc for python2.7 and pypy builds > -- > > Key: SPARK-24980 > URL: https://issues.apache.org/jira/browse/SPARK-24980 > Project: Spark > Issue Type: Improvement > Components: Build, PySpark >Affects Versions: 2.3.1 >Reporter: shane knapp >Assignee: shane knapp >Priority: Major > > since we have full support for python3.4 via anaconda, it's time to create > similar environments for 2.7 and pypy 2.5.1. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24980) add support for pandas/arrow etc for python2.7 and pypy builds
[ https://issues.apache.org/jira/browse/SPARK-24980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565780#comment-16565780 ] shane knapp edited comment on SPARK-24980 at 8/1/18 7:14 PM: - alright, pandas 0.19.2 and pyarrow 0.8.0 are installed for py27 on the ubuntu workers. this doesn't provide full coverage for tests, but here's the list of builds pinned to these workers: [https://amplab.cs.berkeley.edu/jenkins/label/ubuntu/] one of my test builds is currently running, which will show if these tests are being run against pandas and pyarrow w/py27: [ https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/893 |[https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/]893] i'll deal w/the centos updates later. was (Author: shaneknapp): alright, pandas 0.19.2 and pyarrow 0.8.0 are installed for py27 on the ubuntu workers. this doesn't provide full coverage for tests, but here's the list of builds pinned to these workers: [https://amplab.cs.berkeley.edu/jenkins/label/ubuntu/] one of my test builds is currently running, which will show if these tests are being run against pandas and pyarrow w/py27: [https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/]893 i'll deal w/the centos updates later. > add support for pandas/arrow etc for python2.7 and pypy builds > -- > > Key: SPARK-24980 > URL: https://issues.apache.org/jira/browse/SPARK-24980 > Project: Spark > Issue Type: Improvement > Components: Build, PySpark >Affects Versions: 2.3.1 >Reporter: shane knapp >Assignee: shane knapp >Priority: Major > > since we have full support for python3.4 via anaconda, it's time to create > similar environments for 2.7 and pypy 2.5.1. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24980) add support for pandas/arrow etc for python2.7 and pypy builds
[ https://issues.apache.org/jira/browse/SPARK-24980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565780#comment-16565780 ] shane knapp edited comment on SPARK-24980 at 8/1/18 7:13 PM: - alright, pandas 0.19.2 and pyarrow 0.8.0 are installed for py27 on the ubuntu workers. this doesn't provide full coverage for tests, but here's the list of builds pinned to these workers: [https://amplab.cs.berkeley.edu/jenkins/label/ubuntu/] one of my test builds is currently running, which will show if these tests are being run against pandas and pyarrow w/py27: [https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/]893 i'll deal w/the centos updates later. was (Author: shaneknapp): alright, pandas 0.19.2 and pyarrow 0.8.0 are installed for py27 on the ubuntu workers. this doesn't provide full coverage for tests, but here's the list of builds pinned to these workers: [https://amplab.cs.berkeley.edu/jenkins/label/ubuntu/] one of my test builds is currently running, which will show if these tests are being run against pandas and pyarrow w/py27: [https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/] i'll deal w/the centos updates later. > add support for pandas/arrow etc for python2.7 and pypy builds > -- > > Key: SPARK-24980 > URL: https://issues.apache.org/jira/browse/SPARK-24980 > Project: Spark > Issue Type: Improvement > Components: Build, PySpark >Affects Versions: 2.3.1 >Reporter: shane knapp >Assignee: shane knapp >Priority: Major > > since we have full support for python3.4 via anaconda, it's time to create > similar environments for 2.7 and pypy 2.5.1. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24991) use InternalRow in DataSourceWriter
[ https://issues.apache.org/jira/browse/SPARK-24991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24991: Assignee: Apache Spark (was: Wenchen Fan) > use InternalRow in DataSourceWriter > --- > > Key: SPARK-24991 > URL: https://issues.apache.org/jira/browse/SPARK-24991 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24991) use InternalRow in DataSourceWriter
[ https://issues.apache.org/jira/browse/SPARK-24991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565837#comment-16565837 ] Apache Spark commented on SPARK-24991: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/21948 > use InternalRow in DataSourceWriter > --- > > Key: SPARK-24991 > URL: https://issues.apache.org/jira/browse/SPARK-24991 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24991) use InternalRow in DataSourceWriter
[ https://issues.apache.org/jira/browse/SPARK-24991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24991: Assignee: Wenchen Fan (was: Apache Spark) > use InternalRow in DataSourceWriter > --- > > Key: SPARK-24991 > URL: https://issues.apache.org/jira/browse/SPARK-24991 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24991) use InternalRow in DataSourceWriter
Wenchen Fan created SPARK-24991: --- Summary: use InternalRow in DataSourceWriter Key: SPARK-24991 URL: https://issues.apache.org/jira/browse/SPARK-24991 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Wenchen Fan Assignee: Wenchen Fan -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23915) High-order function: array_except(x, y) → array
[ https://issues.apache.org/jira/browse/SPARK-23915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-23915: --- Assignee: Kazuaki Ishizaki > High-order function: array_except(x, y) → array > --- > > Key: SPARK-23915 > URL: https://issues.apache.org/jira/browse/SPARK-23915 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Assignee: Kazuaki Ishizaki >Priority: Major > Fix For: 2.4.0 > > > Ref: https://prestodb.io/docs/current/functions/array.html > Returns an array of elements in x but not in y, without duplicates. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23915) High-order function: array_except(x, y) → array
[ https://issues.apache.org/jira/browse/SPARK-23915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-23915. - Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21103 [https://github.com/apache/spark/pull/21103] > High-order function: array_except(x, y) → array > --- > > Key: SPARK-23915 > URL: https://issues.apache.org/jira/browse/SPARK-23915 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Assignee: Kazuaki Ishizaki >Priority: Major > Fix For: 2.4.0 > > > Ref: https://prestodb.io/docs/current/functions/array.html > Returns an array of elements in x but not in y, without duplicates. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19602) Unable to query using the fully qualified column name of the form ( ..)
[ https://issues.apache.org/jira/browse/SPARK-19602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565802#comment-16565802 ] Sunitha Kambhampati commented on SPARK-19602: - The design doc is also uploaded [here|https://drive.google.com/file/d/1zKm3aNZ3DpsqIuoMvRsf0kkDkXsAasxH/view] > Unable to query using the fully qualified column name of the form ( > ..) > -- > > Key: SPARK-19602 > URL: https://issues.apache.org/jira/browse/SPARK-19602 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Sunitha Kambhampati >Assignee: Sunitha Kambhampati >Priority: Major > Attachments: Design_ColResolution_JIRA19602.pdf > > > 1) Spark SQL fails to analyze this query: select db1.t1.i1 from db1.t1, > db2.t1 > Most of the other database systems support this ( e.g DB2, Oracle, MySQL). > Note: In DB2, Oracle, the notion is of .. > 2) Another scenario where this fully qualified name is useful is as follows: > // current database is db1. > select t1.i1 from t1, db2.t1 > If the i1 column exists in both tables: db1.t1 and db2.t1, this will throw an > error during column resolution in the analyzer, as it is ambiguous. > Lets say the user intended to retrieve i1 from db1.t1 but in the example > db2.t1 only has i1 column. The query would still succeed instead of throwing > an error. > One way to avoid confusion would be to explicitly specify using the fully > qualified name db1.t1.i1 > For e.g: select db1.t1.i1 from t1, db2.t1 > Workarounds: > There is a workaround for these issues, which is to use an alias. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24980) add support for pandas/arrow etc for python2.7 and pypy builds
[ https://issues.apache.org/jira/browse/SPARK-24980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565780#comment-16565780 ] shane knapp commented on SPARK-24980: - alright, pandas 0.19.2 and pyarrow 0.8.0 are installed for py27 on the ubuntu workers. this doesn't provide full coverage for tests, but here's the list of builds pinned to these workers: [https://amplab.cs.berkeley.edu/jenkins/label/ubuntu/] one of my test builds is currently running, which will show if these tests are being run against pandas and pyarrow w/py27: [https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/] i'll deal w/the centos updates later. > add support for pandas/arrow etc for python2.7 and pypy builds > -- > > Key: SPARK-24980 > URL: https://issues.apache.org/jira/browse/SPARK-24980 > Project: Spark > Issue Type: Improvement > Components: Build, PySpark >Affects Versions: 2.3.1 >Reporter: shane knapp >Assignee: shane knapp >Priority: Major > > since we have full support for python3.4 via anaconda, it's time to create > similar environments for 2.7 and pypy 2.5.1. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24632) Allow 3rd-party libraries to use pyspark.ml abstractions for Java wrappers for persistence
[ https://issues.apache.org/jira/browse/SPARK-24632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565769#comment-16565769 ] Joseph K. Bradley commented on SPARK-24632: --- I'm unassigning myself since I don't have time to work on this right now. : ( > Allow 3rd-party libraries to use pyspark.ml abstractions for Java wrappers > for persistence > -- > > Key: SPARK-24632 > URL: https://issues.apache.org/jira/browse/SPARK-24632 > Project: Spark > Issue Type: Improvement > Components: ML, PySpark >Affects Versions: 2.4.0 >Reporter: Joseph K. Bradley >Assignee: Joseph K. Bradley >Priority: Major > > This is a follow-up for [SPARK-17025], which allowed users to implement > Python PipelineStages in 3rd-party libraries, include them in Pipelines, and > use Pipeline persistence. This task is to make it easier for 3rd-party > libraries to have PipelineStages written in Java and then to use pyspark.ml > abstractions to create wrappers around those Java classes. This is currently > possible, except that users hit bugs around persistence. > I spent a bit thinking about this and wrote up thoughts and a proposal in the > doc linked below. Summary of proposal: > Require that 3rd-party libraries with Java classes with Python wrappers > implement a trait which provides the corresponding Python classpath in some > field: > {code} > trait PythonWrappable { > def pythonClassPath: String = … > } > MyJavaType extends PythonWrappable > {code} > This will not be required for MLlib wrappers, which we can handle specially. > One issue for this task will be that we may have trouble writing unit tests. > They would ideally test a Java class + Python wrapper class pair sitting > outside of pyspark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24632) Allow 3rd-party libraries to use pyspark.ml abstractions for Java wrappers for persistence
[ https://issues.apache.org/jira/browse/SPARK-24632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley reassigned SPARK-24632: - Assignee: (was: Joseph K. Bradley) > Allow 3rd-party libraries to use pyspark.ml abstractions for Java wrappers > for persistence > -- > > Key: SPARK-24632 > URL: https://issues.apache.org/jira/browse/SPARK-24632 > Project: Spark > Issue Type: Improvement > Components: ML, PySpark >Affects Versions: 2.4.0 >Reporter: Joseph K. Bradley >Priority: Major > > This is a follow-up for [SPARK-17025], which allowed users to implement > Python PipelineStages in 3rd-party libraries, include them in Pipelines, and > use Pipeline persistence. This task is to make it easier for 3rd-party > libraries to have PipelineStages written in Java and then to use pyspark.ml > abstractions to create wrappers around those Java classes. This is currently > possible, except that users hit bugs around persistence. > I spent a bit thinking about this and wrote up thoughts and a proposal in the > doc linked below. Summary of proposal: > Require that 3rd-party libraries with Java classes with Python wrappers > implement a trait which provides the corresponding Python classpath in some > field: > {code} > trait PythonWrappable { > def pythonClassPath: String = … > } > MyJavaType extends PythonWrappable > {code} > This will not be required for MLlib wrappers, which we can handle specially. > One issue for this task will be that we may have trouble writing unit tests. > They would ideally test a Java class + Python wrapper class pair sitting > outside of pyspark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24632) Allow 3rd-party libraries to use pyspark.ml abstractions for Java wrappers for persistence
[ https://issues.apache.org/jira/browse/SPARK-24632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565767#comment-16565767 ] Joseph K. Bradley commented on SPARK-24632: --- That's a good point. Let's do it your way. : ) You're right that putting this knowledge of wrapper classpaths on the Python side is better organized. That will allow users to wrap Scala classes later without breaking APIs (by adding new mix-ins). > Allow 3rd-party libraries to use pyspark.ml abstractions for Java wrappers > for persistence > -- > > Key: SPARK-24632 > URL: https://issues.apache.org/jira/browse/SPARK-24632 > Project: Spark > Issue Type: Improvement > Components: ML, PySpark >Affects Versions: 2.4.0 >Reporter: Joseph K. Bradley >Assignee: Joseph K. Bradley >Priority: Major > > This is a follow-up for [SPARK-17025], which allowed users to implement > Python PipelineStages in 3rd-party libraries, include them in Pipelines, and > use Pipeline persistence. This task is to make it easier for 3rd-party > libraries to have PipelineStages written in Java and then to use pyspark.ml > abstractions to create wrappers around those Java classes. This is currently > possible, except that users hit bugs around persistence. > I spent a bit thinking about this and wrote up thoughts and a proposal in the > doc linked below. Summary of proposal: > Require that 3rd-party libraries with Java classes with Python wrappers > implement a trait which provides the corresponding Python classpath in some > field: > {code} > trait PythonWrappable { > def pythonClassPath: String = … > } > MyJavaType extends PythonWrappable > {code} > This will not be required for MLlib wrappers, which we can handle specially. > One issue for this task will be that we may have trouble writing unit tests. > They would ideally test a Java class + Python wrapper class pair sitting > outside of pyspark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17861) Store data source partitions in metastore and push partition pruning into metastore
[ https://issues.apache.org/jira/browse/SPARK-17861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565749#comment-16565749 ] nirav patel commented on SPARK-17861: - [~rxin] can this also be supported via dataframe? so following will also give same behavior? `df.write.mode(SaveMode.Overwrite).partitionBy(partitionCols : _*).parquet(tableLocation)` Currently it overwrites all partitions with spark 2.2.1 version > Store data source partitions in metastore and push partition pruning into > metastore > --- > > Key: SPARK-17861 > URL: https://issues.apache.org/jira/browse/SPARK-17861 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Reynold Xin >Assignee: Eric Liang >Priority: Critical > Fix For: 2.1.0 > > > Initially, Spark SQL does not store any partition information in the catalog > for data source tables, because initially it was designed to work with > arbitrary files. This, however, has a few issues for catalog tables: > 1. Listing partitions for a large table (with millions of partitions) can be > very slow during cold start. > 2. Does not support heterogeneous partition naming schemes. > 3. Cannot leverage pushing partition pruning into the metastore. > This ticket tracks the work required to push the tracking of partitions into > the metastore. This change should be feature flagged. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23874) Upgrade apache/arrow to 0.10.0
[ https://issues.apache.org/jira/browse/SPARK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565728#comment-16565728 ] shane knapp commented on SPARK-23874: - i've got a PR ready to go on my end for our ansible to deploy this. > Upgrade apache/arrow to 0.10.0 > -- > > Key: SPARK-23874 > URL: https://issues.apache.org/jira/browse/SPARK-23874 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Xiao Li >Assignee: Bryan Cutler >Priority: Major > > Version 0.10.0 will allow for the following improvements and bug fixes: > * Allow for adding BinaryType support > * Bug fix related to array serialization ARROW-1973 > * Python2 str will be made into an Arrow string instead of bytes ARROW-2101 > * Python bytearrays are supported in as input to pyarrow ARROW-2141 > * Java has common interface for reset to cleanup complex vectors in Spark > ArrowWriter ARROW-1962 > * Cleanup pyarrow type equality checks ARROW-2423 > * ArrowStreamWriter should not hold references to ArrowBlocks ARROW-2632, > ARROW-2645 > * Improved low level handling of messages for RecordBatch ARROW-2704 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24990) merge ReadSupport and ReadSupportWithSchema
[ https://issues.apache.org/jira/browse/SPARK-24990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24990: Assignee: Wenchen Fan (was: Apache Spark) > merge ReadSupport and ReadSupportWithSchema > --- > > Key: SPARK-24990 > URL: https://issues.apache.org/jira/browse/SPARK-24990 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24990) merge ReadSupport and ReadSupportWithSchema
[ https://issues.apache.org/jira/browse/SPARK-24990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565688#comment-16565688 ] Apache Spark commented on SPARK-24990: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/21946 > merge ReadSupport and ReadSupportWithSchema > --- > > Key: SPARK-24990 > URL: https://issues.apache.org/jira/browse/SPARK-24990 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24990) merge ReadSupport and ReadSupportWithSchema
[ https://issues.apache.org/jira/browse/SPARK-24990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24990: Assignee: Apache Spark (was: Wenchen Fan) > merge ReadSupport and ReadSupportWithSchema > --- > > Key: SPARK-24990 > URL: https://issues.apache.org/jira/browse/SPARK-24990 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24990) merge ReadSupport and ReadSupportWithSchema
Wenchen Fan created SPARK-24990: --- Summary: merge ReadSupport and ReadSupportWithSchema Key: SPARK-24990 URL: https://issues.apache.org/jira/browse/SPARK-24990 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: Wenchen Fan Assignee: Wenchen Fan -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
[ https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24989: Assignee: (was: Apache Spark) > BlockFetcher should retry while getting OutOfDirectMemoryError > -- > > Key: SPARK-24989 > URL: https://issues.apache.org/jira/browse/SPARK-24989 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.2.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: FailedStage.png > > > h3. Description > This problem can be reproduced stably by a large parallelism job migrate from > map reduce to Spark in our practice, some metrics list below: > ||Item||Value|| > |spark.executor.instances|1000| > |spark.executor.cores|5| > |task number of shuffle writer stage|18038| > |task number of shuffle reader stage|8| > While the shuffle writer stage successful ended, the shuffle reader stage > starting and keep failing by FetchFail. Each fetch request need the netty > sever allocate a buffer in 16MB(detailed stack attached below), the huge > amount of fetch request will use up default maxDirectMemory rapidly, even > though we bump up io.netty.maxDirectMemory to 50GB! > {code:java} > org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 > byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate > 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependen
[jira] [Assigned] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
[ https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24989: Assignee: Apache Spark > BlockFetcher should retry while getting OutOfDirectMemoryError > -- > > Key: SPARK-24989 > URL: https://issues.apache.org/jira/browse/SPARK-24989 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.2.0 >Reporter: Li Yuanjian >Assignee: Apache Spark >Priority: Major > Attachments: FailedStage.png > > > h3. Description > This problem can be reproduced stably by a large parallelism job migrate from > map reduce to Spark in our practice, some metrics list below: > ||Item||Value|| > |spark.executor.instances|1000| > |spark.executor.cores|5| > |task number of shuffle writer stage|18038| > |task number of shuffle reader stage|8| > While the shuffle writer stage successful ended, the shuffle reader stage > starting and keep failing by FetchFail. Each fetch request need the netty > sever allocate a buffer in 16MB(detailed stack attached below), the huge > amount of fetch request will use up default maxDirectMemory rapidly, even > though we bump up io.netty.maxDirectMemory to 50GB! > {code:java} > org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 > byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate > 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > io.netty.util.internal.PlatformDependent.incrementMemor
[jira] [Commented] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
[ https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565577#comment-16565577 ] Apache Spark commented on SPARK-24989: -- User 'xuanyuanking' has created a pull request for this issue: https://github.com/apache/spark/pull/21945 > BlockFetcher should retry while getting OutOfDirectMemoryError > -- > > Key: SPARK-24989 > URL: https://issues.apache.org/jira/browse/SPARK-24989 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.2.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: FailedStage.png > > > h3. Description > This problem can be reproduced stably by a large parallelism job migrate from > map reduce to Spark in our practice, some metrics list below: > ||Item||Value|| > |spark.executor.instances|1000| > |spark.executor.cores|5| > |task number of shuffle writer stage|18038| > |task number of shuffle reader stage|8| > While the shuffle writer stage successful ended, the shuffle reader stage > starting and keep failing by FetchFail. Each fetch request need the netty > sever allocate a buffer in 16MB(detailed stack attached below), the huge > amount of fetch request will use up default maxDirectMemory rapidly, even > though we bump up io.netty.maxDirectMemory to 50GB! > {code:java} > org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 > byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate > 16777216 byte(s) of direct memory (used: 214748
[jira] [Updated] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
[ https://issues.apache.org/jira/browse/SPARK-24989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Yuanjian updated SPARK-24989: Attachment: FailedStage.png > BlockFetcher should retry while getting OutOfDirectMemoryError > -- > > Key: SPARK-24989 > URL: https://issues.apache.org/jira/browse/SPARK-24989 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.2.0 >Reporter: Li Yuanjian >Priority: Major > Attachments: FailedStage.png > > > h3. Description > This problem can be reproduced stably by a large parallelism job migrate from > map reduce to Spark in our practice, some metrics list below: > ||Item||Value|| > |spark.executor.instances|1000| > |spark.executor.cores|5| > |task number of shuffle writer stage|18038| > |task number of shuffle reader stage|8| > While the shuffle writer stage successful ended, the shuffle reader stage > starting and keep failing by FetchFail. Each fetch request need the netty > sever allocate a buffer in 16MB(detailed stack attached below), the huge > amount of fetch request will use up default maxDirectMemory rapidly, even > though we bump up io.netty.maxDirectMemory to 50GB! > {code:java} > org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 > byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate > 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) > at > io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530) >
[jira] [Created] (SPARK-24989) BlockFetcher should retry while getting OutOfDirectMemoryError
Li Yuanjian created SPARK-24989: --- Summary: BlockFetcher should retry while getting OutOfDirectMemoryError Key: SPARK-24989 URL: https://issues.apache.org/jira/browse/SPARK-24989 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 2.2.0 Reporter: Li Yuanjian h3. Description This problem can be reproduced stably by a large parallelism job migrate from map reduce to Spark in our practice, some metrics list below: ||Item||Value|| |spark.executor.instances|1000| |spark.executor.cores|5| |task number of shuffle writer stage|18038| |task number of shuffle reader stage|8| While the shuffle writer stage successful ended, the shuffle reader stage starting and keep failing by FetchFail. Each fetch request need the netty sever allocate a buffer in 16MB(detailed stack attached below), the huge amount of fetch request will use up default maxDirectMemory rapidly, even though we bump up io.netty.maxDirectMemory to 50GB! {code:java} org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:514) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:445) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:119) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 21474836480, max: 21474836480) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237) at io
[jira] [Commented] (SPARK-24909) Spark scheduler can hang when fetch failures, executor lost, task running on lost executor, and multiple stage attempts
[ https://issues.apache.org/jira/browse/SPARK-24909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565513#comment-16565513 ] Thomas Graves commented on SPARK-24909: --- looking more I think the fix may actually just be to revert the change from SPARK-19263, so that it always does shuffleStage.pendingPartitions -= task.partitionId. The change in SPARK-23433, should fix the issue originaly from SPARK-19263. If we always remove it from the pendingPartitions and the map output isn't there it will resubmit the stage. SPARK-23433, since its marking all tasks in other stage attempts as complete should make sure no other active stages for that are running. Need to investigate more and run some tests. > Spark scheduler can hang when fetch failures, executor lost, task running on > lost executor, and multiple stage attempts > --- > > Key: SPARK-24909 > URL: https://issues.apache.org/jira/browse/SPARK-24909 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.3.1 >Reporter: Thomas Graves >Priority: Critical > > The DAGScheduler can hang if the executor was lost (due to fetch failure) and > all the tasks in the tasks sets are marked as completed. > ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1265)] > It never creates new task attempts in the task scheduler but the dag > scheduler still has pendingPartitions. > {code:java} > 8/07/22 08:30:00 INFO scheduler.TaskSetManager: Starting task 55769.0 in > stage 44.0 (TID 970752, host1.com, executor 33, partition 55769, > PROCESS_LOCAL, 7874 bytes) > 18/07/22 08:30:29 INFO scheduler.DAGScheduler: Marking ShuffleMapStage 44 > (repartition at Lift.scala:191) as failed due to a fetch failure from > ShuffleMapStage 42 (map at foo.scala:27) > 18/07/22 08:30:29 INFO scheduler.DAGScheduler: Resubmitting ShuffleMapStage > 42 (map at foo.scala:27) and ShuffleMapStage 44 (repartition at > bar.scala:191) due to fetch failure > > 18/07/22 08:30:56 INFO scheduler.DAGScheduler: Executor lost: 33 (epoch 18) > 18/07/22 08:30:56 INFO schedulerDAGScheduler: Shuffle files lost for > executor: 33 (epoch 18) > 18/07/22 08:31:20 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 44 > (MapPartitionsRDD[70] at repartition at bar.scala:191), which has no missing > parents > 18/07/22 08:31:21 INFO cluster.YarnClusterScheduler: Adding task set 44.1 > with 59955 tasks > 18/07/22 08:31:41 INFO scheduler.TaskSetManager: Finished task 55769.0 in > stage 44.0 (TID 970752) in 101505 ms on host1.com (executor 33) (15081/73320) > 8/07/22 08:31:41 INFO scheduler.DAGScheduler: Ignoring possibly bogus > ShuffleMapTask(44, 55769) completion from executor 33{code} > > > In the logs above you will see that task 55769.0 finished after the executor > was lost and a new task set was started. The DAG scheduler says "Ignoring > possibly bogus".. but in the TaskSetManager side it has marked those tasks as > completed for all stage attempts. The DAGScheduler gets hung here. I did a > heap dump on the process and can see that 55769 is still in the DAGScheduler > pendingPartitions list but the tasksetmanagers are all complete > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24988) Add a castBySchema method which casts all the values of a DataFrame based on the DataTypes of a StructType
[ https://issues.apache.org/jira/browse/SPARK-24988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565457#comment-16565457 ] Apache Spark commented on SPARK-24988: -- User 'mahmoudmahdi24' has created a pull request for this issue: https://github.com/apache/spark/pull/21944 > Add a castBySchema method which casts all the values of a DataFrame based on > the DataTypes of a StructType > -- > > Key: SPARK-24988 > URL: https://issues.apache.org/jira/browse/SPARK-24988 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: mahmoud mehdi >Priority: Minor > > The main goal of this User Story is to extend the Dataframe methods in order > to add a method which casts all the values of a Dataframe, based on the > DataTypes of a StructType. > This feature can be useful when we have a large dataframe, and that we need > to make multiple casts. In that case, we won't have to cast each value > independently, all we have to do is to pass a StructType to the method > castBySchema with the types we need (In real world examples, this schema is > generally provided by the client, which was my case). > I'll explain the new feature via an example, let's create a dataframe of > strings : > {code:java} > val df = Seq(("test1", "0"), ("test2", "1")).toDF("name", "id") > {code} > Let's suppose that we want to cast the second column's values of the > dataframe to integers, all we have to do is the following : > {code:java} > val schema = StructType( Seq( StructField("name", StringType, true), > StructField("id", IntegerType, true))){code} > {code:java} > df.castBySchema(schema) > {code} > I made sure that castBySchema works also with nested StructTypes by adding > several tests. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24988) Add a castBySchema method which casts all the values of a DataFrame based on the DataTypes of a StructType
[ https://issues.apache.org/jira/browse/SPARK-24988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24988: Assignee: Apache Spark > Add a castBySchema method which casts all the values of a DataFrame based on > the DataTypes of a StructType > -- > > Key: SPARK-24988 > URL: https://issues.apache.org/jira/browse/SPARK-24988 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: mahmoud mehdi >Assignee: Apache Spark >Priority: Minor > > The main goal of this User Story is to extend the Dataframe methods in order > to add a method which casts all the values of a Dataframe, based on the > DataTypes of a StructType. > This feature can be useful when we have a large dataframe, and that we need > to make multiple casts. In that case, we won't have to cast each value > independently, all we have to do is to pass a StructType to the method > castBySchema with the types we need (In real world examples, this schema is > generally provided by the client, which was my case). > I'll explain the new feature via an example, let's create a dataframe of > strings : > {code:java} > val df = Seq(("test1", "0"), ("test2", "1")).toDF("name", "id") > {code} > Let's suppose that we want to cast the second column's values of the > dataframe to integers, all we have to do is the following : > {code:java} > val schema = StructType( Seq( StructField("name", StringType, true), > StructField("id", IntegerType, true))){code} > {code:java} > df.castBySchema(schema) > {code} > I made sure that castBySchema works also with nested StructTypes by adding > several tests. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24988) Add a castBySchema method which casts all the values of a DataFrame based on the DataTypes of a StructType
[ https://issues.apache.org/jira/browse/SPARK-24988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24988: Assignee: (was: Apache Spark) > Add a castBySchema method which casts all the values of a DataFrame based on > the DataTypes of a StructType > -- > > Key: SPARK-24988 > URL: https://issues.apache.org/jira/browse/SPARK-24988 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: mahmoud mehdi >Priority: Minor > > The main goal of this User Story is to extend the Dataframe methods in order > to add a method which casts all the values of a Dataframe, based on the > DataTypes of a StructType. > This feature can be useful when we have a large dataframe, and that we need > to make multiple casts. In that case, we won't have to cast each value > independently, all we have to do is to pass a StructType to the method > castBySchema with the types we need (In real world examples, this schema is > generally provided by the client, which was my case). > I'll explain the new feature via an example, let's create a dataframe of > strings : > {code:java} > val df = Seq(("test1", "0"), ("test2", "1")).toDF("name", "id") > {code} > Let's suppose that we want to cast the second column's values of the > dataframe to integers, all we have to do is the following : > {code:java} > val schema = StructType( Seq( StructField("name", StringType, true), > StructField("id", IntegerType, true))){code} > {code:java} > df.castBySchema(schema) > {code} > I made sure that castBySchema works also with nested StructTypes by adding > several tests. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24909) Spark scheduler can hang when fetch failures, executor lost, task running on lost executor, and multiple stage attempts
[ https://issues.apache.org/jira/browse/SPARK-24909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565437#comment-16565437 ] Thomas Graves commented on SPARK-24909: --- this is unfortunately not a straight forward fix, the DAGScheduler doesn't have control over the fine grain things in the taskscheduler/tasksetmanager, so undoing some of that is not possible with the existing api. one possible thing would be to undo the change in SPARK-19263, [https://github.com/apache/spark/commit/729ce3703257aa34c00c5c8253e6971faf6a0c8d] and fix that another way as well Still working on a fix, just making a few notes. > Spark scheduler can hang when fetch failures, executor lost, task running on > lost executor, and multiple stage attempts > --- > > Key: SPARK-24909 > URL: https://issues.apache.org/jira/browse/SPARK-24909 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.3.1 >Reporter: Thomas Graves >Priority: Critical > > The DAGScheduler can hang if the executor was lost (due to fetch failure) and > all the tasks in the tasks sets are marked as completed. > ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1265)] > It never creates new task attempts in the task scheduler but the dag > scheduler still has pendingPartitions. > {code:java} > 8/07/22 08:30:00 INFO scheduler.TaskSetManager: Starting task 55769.0 in > stage 44.0 (TID 970752, host1.com, executor 33, partition 55769, > PROCESS_LOCAL, 7874 bytes) > 18/07/22 08:30:29 INFO scheduler.DAGScheduler: Marking ShuffleMapStage 44 > (repartition at Lift.scala:191) as failed due to a fetch failure from > ShuffleMapStage 42 (map at foo.scala:27) > 18/07/22 08:30:29 INFO scheduler.DAGScheduler: Resubmitting ShuffleMapStage > 42 (map at foo.scala:27) and ShuffleMapStage 44 (repartition at > bar.scala:191) due to fetch failure > > 18/07/22 08:30:56 INFO scheduler.DAGScheduler: Executor lost: 33 (epoch 18) > 18/07/22 08:30:56 INFO schedulerDAGScheduler: Shuffle files lost for > executor: 33 (epoch 18) > 18/07/22 08:31:20 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 44 > (MapPartitionsRDD[70] at repartition at bar.scala:191), which has no missing > parents > 18/07/22 08:31:21 INFO cluster.YarnClusterScheduler: Adding task set 44.1 > with 59955 tasks > 18/07/22 08:31:41 INFO scheduler.TaskSetManager: Finished task 55769.0 in > stage 44.0 (TID 970752) in 101505 ms on host1.com (executor 33) (15081/73320) > 8/07/22 08:31:41 INFO scheduler.DAGScheduler: Ignoring possibly bogus > ShuffleMapTask(44, 55769) completion from executor 33{code} > > > In the logs above you will see that task 55769.0 finished after the executor > was lost and a new task set was started. The DAG scheduler says "Ignoring > possibly bogus".. but in the TaskSetManager side it has marked those tasks as > completed for all stage attempts. The DAGScheduler gets hung here. I did a > heap dump on the process and can see that 55769 is still in the DAGScheduler > pendingPartitions list but the tasksetmanagers are all complete > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24795) Implement barrier execution mode
[ https://issues.apache.org/jira/browse/SPARK-24795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565423#comment-16565423 ] Apache Spark commented on SPARK-24795: -- User 'jiangxb1987' has created a pull request for this issue: https://github.com/apache/spark/pull/21943 > Implement barrier execution mode > > > Key: SPARK-24795 > URL: https://issues.apache.org/jira/browse/SPARK-24795 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Assignee: Jiang Xingbo >Priority: Major > Fix For: 2.4.0 > > > Implement barrier execution mode, as described in SPARK-24582 > Include all the API changes and basic implementation (except for > BarrierTaskContext.barrier()) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24821) Fail fast when submitted job compute on a subset of all the partitions for a barrier stage
[ https://issues.apache.org/jira/browse/SPARK-24821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565377#comment-16565377 ] Apache Spark commented on SPARK-24821: -- User 'jiangxb1987' has created a pull request for this issue: https://github.com/apache/spark/pull/21927 > Fail fast when submitted job compute on a subset of all the partitions for a > barrier stage > -- > > Key: SPARK-24821 > URL: https://issues.apache.org/jira/browse/SPARK-24821 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jiang Xingbo >Priority: Major > > Detect SparkContext.runJob() launch a barrier stage with a subset of all the > partitions, one example is the `first()` operation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24988) Add a castBySchema method which casts all the values of a DataFrame based on the DataTypes of a StructType
[ https://issues.apache.org/jira/browse/SPARK-24988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565358#comment-16565358 ] mahmoud mehdi commented on SPARK-24988: --- I am working on it. > Add a castBySchema method which casts all the values of a DataFrame based on > the DataTypes of a StructType > -- > > Key: SPARK-24988 > URL: https://issues.apache.org/jira/browse/SPARK-24988 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: mahmoud mehdi >Priority: Minor > > The main goal of this User Story is to extend the Dataframe methods in order > to add a method which casts all the values of a Dataframe, based on the > DataTypes of a StructType. > This feature can be useful when we have a large dataframe, and that we need > to make multiple casts. In that case, we won't have to cast each value > independently, all we have to do is to pass a StructType to the method > castBySchema with the types we need (In real world examples, this schema is > generally provided by the client, which was my case). > I'll explain the new feature via an example, let's create a dataframe of > strings : > {code:java} > val df = Seq(("test1", "0"), ("test2", "1")).toDF("name", "id") > {code} > Let's suppose that we want to cast the second column's values of the > dataframe to integers, all we have to do is the following : > {code:java} > val schema = StructType( Seq( StructField("name", StringType, true), > StructField("id", IntegerType, true))){code} > {code:java} > df.castBySchema(schema) > {code} > I made sure that castBySchema works also with nested StructTypes by adding > several tests. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24988) Add a castBySchema method which casts all the values of a DataFrame based on the DataTypes of a StructType
mahmoud mehdi created SPARK-24988: - Summary: Add a castBySchema method which casts all the values of a DataFrame based on the DataTypes of a StructType Key: SPARK-24988 URL: https://issues.apache.org/jira/browse/SPARK-24988 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0 Reporter: mahmoud mehdi The main goal of this User Story is to extend the Dataframe methods in order to add a method which casts all the values of a Dataframe, based on the DataTypes of a StructType. This feature can be useful when we have a large dataframe, and that we need to make multiple casts. In that case, we won't have to cast each value independently, all we have to do is to pass a StructType to the method castBySchema with the types we need (In real world examples, this schema is generally provided by the client, which was my case). I'll explain the new feature via an example, let's create a dataframe of strings : {code:java} val df = Seq(("test1", "0"), ("test2", "1")).toDF("name", "id") {code} Let's suppose that we want to cast the second column's values of the dataframe to integers, all we have to do is the following : {code:java} val schema = StructType( Seq( StructField("name", StringType, true), StructField("id", IntegerType, true))){code} {code:java} df.castBySchema(schema) {code} I made sure that castBySchema works also with nested StructTypes by adding several tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24971) remove SupportsDeprecatedScanRow
[ https://issues.apache.org/jira/browse/SPARK-24971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-24971. - Resolution: Fixed Fix Version/s: 2.4.0 Issue resolved by pull request 21921 [https://github.com/apache/spark/pull/21921] > remove SupportsDeprecatedScanRow > > > Key: SPARK-24971 > URL: https://issues.apache.org/jira/browse/SPARK-24971 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 2.4.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24986) OOM in BufferHolder during writes to a stream
[ https://issues.apache.org/jira/browse/SPARK-24986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565296#comment-16565296 ] Thomas Graves commented on SPARK-24986: --- fyi [~irashid] I know you were looking at memory related things. > OOM in BufferHolder during writes to a stream > - > > Key: SPARK-24986 > URL: https://issues.apache.org/jira/browse/SPARK-24986 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Sanket Reddy >Priority: Major > > We have seen out of memory exception while running one of our prod jobs. We > expect the memory allocation to be managed by unified memory manager during > run time. > So the buffer which is growing during write is somewhat like this if the > rowlength is constant then the buffer does not grow… it keeps resetting and > writing the values to the buffer… if the rows are variable and it is skewed > and has huge stuff to be written this happens and i think the estimator which > requests for initial execution memory does not account for this i think… > Checking for underlying heap before growing the global buffer might be a > viable option > java.lang.OutOfMemoryError: Java heap space > at > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73) > at > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter.initialize(UnsafeArrayWriter.java:61) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232) > at > org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1075) > at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091) > at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1129) > at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:513) > at > org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:329) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1966) > at > org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:270) > 18/06/11 21:18:41 ERROR SparkUncaughtExceptionHandler: [Container in > shutdown] Uncaught exception in thread Thread[stdout writer for > Python/bin/python3.6,5,main] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565252#comment-16565252 ] Genmao Yu commented on SPARK-24630: --- [~Jackey Lee] Pretty good! We also have the SQL Streaming demand. Is there any more detailed design doc? Or we can list those streaming sql syntax which is different from batch firstly. > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Shepherd: Shixiong Zhu (was: Tathagata Das) > Kafka Cached Consumer Leaking File Descriptors > -- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Shepherd: Tathagata Das (was: Shixiong Zhu) > Kafka Cached Consumer Leaking File Descriptors > -- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Summary: Kafka Cached Consumer Leaking File Descriptors (was: Kafka Cached Consumer Leaking Consumers) > Kafka Cached Consumer Leaking File Descriptors > -- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4300) Race condition during SparkWorker shutdown
[ https://issues.apache.org/jira/browse/SPARK-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565104#comment-16565104 ] liqingan edited comment on SPARK-4300 at 8/1/18 10:35 AM: -- i feel upset for this issue ! (n) |Uncaught fatal error from thread [sparkWorker-akka.actor.default-dispatcher-56] shutting down ActorSystem [sparkWorker] java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.(String.java:203) at java.lang.StringBuilder.toString(StringBuilder.java:405) at java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3068) at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864) at java.io.ObjectInputStream.readString(ObjectInputStream.java:1638) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1341) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)| |早上2点12:12.751|ERROR|org.apache.spark.util.logging.FileAppender|Error writing stream to file /hadoop/var/run/spark/work/app-20180727141925-0019/38075/stderr java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162) at java.io.BufferedInputStream.read1(BufferedInputStream.java:272) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at java.io.FilterInputStream.read(FilterInputStream.java:107) at org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1468) at org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)| |早上2点12:12.752|ERROR|org.apache.spark.util.logging.FileAppender|Error writing stream to file /hadoop/var/run/spark/work/app-20180727142159-0032/30823/stderr java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162) at java.io.BufferedInputStream.read1(BufferedInputStream.java:272) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at java.io.FilterInputStream.read(FilterInputStream.java:107) at org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39) at org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) at org.apache.spark.util.logging.FileAppend
[jira] [Comment Edited] (SPARK-4300) Race condition during SparkWorker shutdown
[ https://issues.apache.org/jira/browse/SPARK-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565104#comment-16565104 ] liqingan edited comment on SPARK-4300 at 8/1/18 10:29 AM: -- i feel upset for this issue ! was (Author: liqingan): i feel upset for this issue ! !file:///C:\Users\admin\AppData\Roaming\Tencent\Users\546061117\QQ\WinTemp\RichOle\B\{~TP2PW~}1TYA2AG{CA41H.png! > Race condition during SparkWorker shutdown > -- > > Key: SPARK-4300 > URL: https://issues.apache.org/jira/browse/SPARK-4300 > Project: Spark > Issue Type: Bug > Components: Spark Shell >Affects Versions: 1.1.0 >Reporter: Alex Liu >Assignee: Sean Owen >Priority: Minor > Fix For: 1.2.2, 1.3.1, 1.4.0 > > > When a shark job is done. there are some error message as following show in > the log > {code} > INFO 22:10:41,635 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,640 SparkMaster: Removing app app-20141106221014- > INFO 22:10:41,687 SparkMaster: Removing application > Shark::ip-172-31-11-204.us-west-1.compute.internal > INFO 22:10:41,710 SparkWorker: Asked to kill executor > app-20141106221014-/0 > INFO 22:10:41,712 SparkWorker: Runner thread for executor > app-20141106221014-/0 interrupted > INFO 22:10:41,714 SparkWorker: Killing process! > ERROR 22:10:41,738 SparkWorker: Error writing stream to file > /var/lib/spark/work/app-20141106221014-/0/stdout > ERROR 22:10:41,739 SparkWorker: java.io.IOException: Stream closed > ERROR 22:10:41,739 SparkWorker: at > java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162) > ERROR 22:10:41,740 SparkWorker: at > java.io.BufferedInputStream.read1(BufferedInputStream.java:272) > ERROR 22:10:41,740 SparkWorker: at > java.io.BufferedInputStream.read(BufferedInputStream.java:334) > ERROR 22:10:41,740 SparkWorker: at > java.io.FilterInputStream.read(FilterInputStream.java:107) > ERROR 22:10:41,741 SparkWorker: at > org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70) > ERROR 22:10:41,741 SparkWorker: at > org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39) > ERROR 22:10:41,741 SparkWorker: at > org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) > ERROR 22:10:41,742 SparkWorker: at > org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) > ERROR 22:10:41,742 SparkWorker: at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) > ERROR 22:10:41,742 SparkWorker: at > org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38) > INFO 22:10:41,838 SparkMaster: Connected to Cassandra cluster: 4299 > INFO 22:10:41,839 SparkMaster: Adding host 172.31.11.204 (Analytics) > INFO 22:10:41,840 SparkMaster: New Cassandra host /172.31.11.204:9042 added > INFO 22:10:41,841 SparkMaster: Adding host 172.31.11.204 (Analytics) > INFO 22:10:41,842 SparkMaster: Adding host 172.31.11.204 (Analytics) > INFO 22:10:41,852 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,853 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,853 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,857 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,862 SparkMaster: Adding host 172.31.11.204 (Analytics) > WARN 22:10:42,200 SparkMaster: Got status update for unknown executor > app-20141106221014-/0 > INFO 22:10:42,211 SparkWorker: Executor app-20141106221014-/0 finished > with state KILLED exitStatus 143 > {code} > /var/lib/spark/work/app-20141106221014-/0/stdout is on the disk. It is > trying to write to a close IO stream. > Spark worker shuts down by {code} > private def killProcess(message: Option[String]) { > var exitCode: Option[Int] = None > logInfo("Killing process!") > process.destroy() > process.waitFor() > if (stdoutAppender != null) { > stdoutAppender.stop() > } > if (stderrAppender != null) { > stderrAppender.stop() > } > if (process != null) { > exitCode = Some(process.waitFor()) > } > worker ! ExecutorStateChanged(appId, execId, state, message, exitCode) > > {code} > But stdoutAppender concurrently writes to output log file,
[jira] [Commented] (SPARK-4300) Race condition during SparkWorker shutdown
[ https://issues.apache.org/jira/browse/SPARK-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565104#comment-16565104 ] liqingan commented on SPARK-4300: - i feel upset for this issue ! !file:///C:\Users\admin\AppData\Roaming\Tencent\Users\546061117\QQ\WinTemp\RichOle\B\{~TP2PW~}1TYA2AG{CA41H.png! > Race condition during SparkWorker shutdown > -- > > Key: SPARK-4300 > URL: https://issues.apache.org/jira/browse/SPARK-4300 > Project: Spark > Issue Type: Bug > Components: Spark Shell >Affects Versions: 1.1.0 >Reporter: Alex Liu >Assignee: Sean Owen >Priority: Minor > Fix For: 1.2.2, 1.3.1, 1.4.0 > > > When a shark job is done. there are some error message as following show in > the log > {code} > INFO 22:10:41,635 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,640 SparkMaster: Removing app app-20141106221014- > INFO 22:10:41,687 SparkMaster: Removing application > Shark::ip-172-31-11-204.us-west-1.compute.internal > INFO 22:10:41,710 SparkWorker: Asked to kill executor > app-20141106221014-/0 > INFO 22:10:41,712 SparkWorker: Runner thread for executor > app-20141106221014-/0 interrupted > INFO 22:10:41,714 SparkWorker: Killing process! > ERROR 22:10:41,738 SparkWorker: Error writing stream to file > /var/lib/spark/work/app-20141106221014-/0/stdout > ERROR 22:10:41,739 SparkWorker: java.io.IOException: Stream closed > ERROR 22:10:41,739 SparkWorker: at > java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162) > ERROR 22:10:41,740 SparkWorker: at > java.io.BufferedInputStream.read1(BufferedInputStream.java:272) > ERROR 22:10:41,740 SparkWorker: at > java.io.BufferedInputStream.read(BufferedInputStream.java:334) > ERROR 22:10:41,740 SparkWorker: at > java.io.FilterInputStream.read(FilterInputStream.java:107) > ERROR 22:10:41,741 SparkWorker: at > org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70) > ERROR 22:10:41,741 SparkWorker: at > org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39) > ERROR 22:10:41,741 SparkWorker: at > org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) > ERROR 22:10:41,742 SparkWorker: at > org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39) > ERROR 22:10:41,742 SparkWorker: at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) > ERROR 22:10:41,742 SparkWorker: at > org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38) > INFO 22:10:41,838 SparkMaster: Connected to Cassandra cluster: 4299 > INFO 22:10:41,839 SparkMaster: Adding host 172.31.11.204 (Analytics) > INFO 22:10:41,840 SparkMaster: New Cassandra host /172.31.11.204:9042 added > INFO 22:10:41,841 SparkMaster: Adding host 172.31.11.204 (Analytics) > INFO 22:10:41,842 SparkMaster: Adding host 172.31.11.204 (Analytics) > INFO 22:10:41,852 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,853 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,853 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,857 SparkMaster: > akka.tcp://sparkdri...@ip-172-31-11-204.us-west-1.compute.internal:57641 got > disassociated, removing it. > INFO 22:10:41,862 SparkMaster: Adding host 172.31.11.204 (Analytics) > WARN 22:10:42,200 SparkMaster: Got status update for unknown executor > app-20141106221014-/0 > INFO 22:10:42,211 SparkWorker: Executor app-20141106221014-/0 finished > with state KILLED exitStatus 143 > {code} > /var/lib/spark/work/app-20141106221014-/0/stdout is on the disk. It is > trying to write to a close IO stream. > Spark worker shuts down by {code} > private def killProcess(message: Option[String]) { > var exitCode: Option[Int] = None > logInfo("Killing process!") > process.destroy() > process.waitFor() > if (stdoutAppender != null) { > stdoutAppender.stop() > } > if (stderrAppender != null) { > stderrAppender.stop() > } > if (process != null) { > exitCode = Some(process.waitFor()) > } > worker ! ExecutorStateChanged(appId, execId, state, message, exitCode) > > {code} > But stdoutAppender concurrently writes to output log file, which creates race > condition. -- This message was sent by Atlassian JIRA (v7.6.3#76005) ---
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Description: Setup: * Spark 2.3.1 * Java 1.8.0 (112) * Standalone Cluster Manager * 3 Nodes, 1 Executor per node. Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task and are never released. This can be identified by the following line of the stack trace: at org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) Which points to: {code:java} } else if (existingInternalConsumer.inUse) { // If consumer is already cached but is currently in use, then return a new consumer NonCachedKafkaDataConsumer(newInternalConsumer) {code} Meaning the existing consumer created for that `TopicPartition` is still in use for some reason. The weird thing is that you can see this for very old tasks which have already finished successfully. I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination(){code} was: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task and are never released. This can be identified by the following line of the stack trace: at org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) Which points to: {code:java} } else if (existingInternalConsumer.inUse) { // If consumer is already cached but is currently in use, then return a new consumer NonCachedKafkaDataConsumer(newInternalConsumer) {code} Meaning the existing consumer created for that `TopicPartition` is still in use for some reason. The weird thing is that you can see this for very old tasks which have already finished successfully. I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination(){code} > Kafka Cached Consumer Leaking Consumers > --- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Description: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task and are never released. This can be identified by the following line of the stack trace: at org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) Which points to: {code:java} } else if (existingInternalConsumer.inUse) { // If consumer is already cached but is currently in use, then return a new consumer NonCachedKafkaDataConsumer(newInternalConsumer) {code} Meaning the existing consumer created for that `TopicPartition` is still in use for some reason. The weird thing is that you can see this for very old tasks which have already finished successfully. I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination(){code} was: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task and are never released. This can be identified by the following line of the stack trace: I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination(){code} > Kafka Cached Consumer Leaking Consumers > --- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Description: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task and are never released. This can be identified by the following line of the stack trace: I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination(){code} was: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence): {code:java} context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } {code} I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination(){code} > Kafka Cached Consumer Leaking Consumers > --- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > > > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: is
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Shepherd: Tathagata Das > Kafka Cached Consumer Leaking Consumers > --- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task, perhaps the registered callback on > the context is never been called (just a theory, no hard evidence): > {code:java} > context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } > {code} > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Description: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence): {code:java} context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } {code} I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination(){code} was: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence): {code:java} context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } {code} I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. > Kafka Cached Consumer Leaking Consumers > --- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task, perhaps the registered callback on > the context is never been called (just a theory, no hard evidence): > {code:java} > context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } > {code} > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Environment: Spark 2.3.1 Java(TM) SE Runtime Environment (build 1.8.0_112-b15) Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) was: Spark 2.3.1 Java(TM) SE Runtime Environment (build 1.8.0_112-b15) Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination() {code} > Kafka Cached Consumer Leaking Consumers > --- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task, perhaps the registered callback on > the context is never been called (just a theory, no hard evidence): > {code:java} > context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } > {code} > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Description: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence): {code:java} context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } {code} I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. was: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence): {code:java} context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } {code} I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here]([https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d]), and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. > Kafka Cached Consumer Leaking Consumers > --- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination() > {code} >Reporter: Yuval Itzchakov >Priority: Critical > > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task, perhaps the registered callback on > the context is never been called (just a theory, no hard evidence): > {code:java} > context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } > {code} > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuval Itzchakov updated SPARK-24987: Environment: Spark 2.3.1 Java(TM) SE Runtime Environment (build 1.8.0_112-b15) Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) Spark graph: {code:java} kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap {...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination() {code} was: Spark 2.3.1 Java(TM) SE Runtime Environment (build 1.8.0_112-b15) Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) Spark graph: ```scala kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap \{...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination() ``` Description: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence): {code:java} context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } {code} I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here]([https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d]), and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. was: Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence): ```scala context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } ``` I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here](https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d), and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. The number of open FD increases over time and is not immediate, but you can clearly see the amount of descriptors grow over time. This is a snapshot after running the load test for about 5 hours: !image-2018-08-01-13-13-16-339.png! > Kafka Cached Consumer Leaking Consumers > --- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination() > {code} >Reporter: Yuval Itzchakov >Priority: Critical > > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task, perhaps
[jira] [Created] (SPARK-24987) Kafka Cached Consumer Leaking Consumers
Yuval Itzchakov created SPARK-24987: --- Summary: Kafka Cached Consumer Leaking Consumers Key: SPARK-24987 URL: https://issues.apache.org/jira/browse/SPARK-24987 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.3.1, 2.3.0 Environment: Spark 2.3.1 Java(TM) SE Runtime Environment (build 1.8.0_112-b15) Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) Spark graph: ```scala kafkaStream .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .flatMap \{...} .groupByKey(...) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) .foreach(...) .outputMode(OutputMode.Update) .option("checkpointLocation", sparkConfiguration.properties.checkpointDirectory) .start() .awaitTermination() ``` Reporter: Yuval Itzchakov Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire. It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task, perhaps the registered callback on the context is never been called (just a theory, no hard evidence): ```scala context.addTaskCompletionListener { _ => underlying.closeIfNeeded() } ``` I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which [you can find here](https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d), and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing. The number of open FD increases over time and is not immediate, but you can clearly see the amount of descriptors grow over time. This is a snapshot after running the load test for about 5 hours: !image-2018-08-01-13-13-16-339.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24283) Make standard scaler work without legacy MLlib
[ https://issues.apache.org/jira/browse/SPARK-24283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24283: Assignee: (was: Apache Spark) > Make standard scaler work without legacy MLlib > -- > > Key: SPARK-24283 > URL: https://issues.apache.org/jira/browse/SPARK-24283 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.4.0 >Reporter: holdenk >Priority: Trivial > Labels: starter > > Currently StandardScaler converts Spark ML vectors to MLlib vectors during > prediction, we should skip that step. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24283) Make standard scaler work without legacy MLlib
[ https://issues.apache.org/jira/browse/SPARK-24283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565020#comment-16565020 ] Apache Spark commented on SPARK-24283: -- User 'sujithjay' has created a pull request for this issue: https://github.com/apache/spark/pull/21942 > Make standard scaler work without legacy MLlib > -- > > Key: SPARK-24283 > URL: https://issues.apache.org/jira/browse/SPARK-24283 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.4.0 >Reporter: holdenk >Priority: Trivial > Labels: starter > > Currently StandardScaler converts Spark ML vectors to MLlib vectors during > prediction, we should skip that step. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24283) Make standard scaler work without legacy MLlib
[ https://issues.apache.org/jira/browse/SPARK-24283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24283: Assignee: Apache Spark > Make standard scaler work without legacy MLlib > -- > > Key: SPARK-24283 > URL: https://issues.apache.org/jira/browse/SPARK-24283 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.4.0 >Reporter: holdenk >Assignee: Apache Spark >Priority: Trivial > Labels: starter > > Currently StandardScaler converts Spark ML vectors to MLlib vectors during > prediction, we should skip that step. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org