[jira] [Updated] (SPARK-27637) If exception occured while fetching blocks by netty block transfer service, check whether the relative executor is alive before retry
[ https://issues.apache.org/jira/browse/SPARK-27637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] feiwang updated SPARK-27637: Component/s: (was: Block Manager) > If exception occured while fetching blocks by netty block transfer service, > check whether the relative executor is alive before retry > -- > > Key: SPARK-27637 > URL: https://issues.apache.org/jira/browse/SPARK-27637 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.3.2, 2.4.2 >Reporter: feiwang >Priority: Major > > There are several kinds of shuffle client, blockTransferService and > externalShuffleClient. > For the externalShuffleClient, there are relative external shuffle service, > which guarantees the shuffle block data and regardless the state of > executors. > For the blockTransferService, it is used to fetch broadcast block, and fetch > the shuffle data when external shuffle service is not enabled. > When fetching data by using blockTransferService, the shuffle client would > connect relative executor's blockManager, so if the relative executor is > dead, it would never fetch successfully. > When spark.shuffle.service.enabled is true and > spark.dynamicAllocation.enabled is true, the executor will be removed while > it has been idle for more than idleTimeout. > If a blockTransferService create connection to relative executor > successfully, but the relative executor is removed when beginning to fetch > broadcast block, it would retry (see RetryingBlockFetcher), which is > Ineffective. > If the spark.shuffle.io.retryWait and spark.shuffle.io.maxRetries is big, > such as 30s and 10 times, it would waste 5 minutes. > So, I think we should judge whether the relative executor is alive before > retry. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27637) If exception occured while fetching blocks by netty block transfer service, check whether the relative executor is alive before retry
[ https://issues.apache.org/jira/browse/SPARK-27637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] feiwang updated SPARK-27637: Affects Version/s: (was: 2.4.2) (was: 2.3.2) 2.3.3 2.4.3 > If exception occured while fetching blocks by netty block transfer service, > check whether the relative executor is alive before retry > -- > > Key: SPARK-27637 > URL: https://issues.apache.org/jira/browse/SPARK-27637 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.3.3, 2.4.3 >Reporter: feiwang >Priority: Major > > There are several kinds of shuffle client, blockTransferService and > externalShuffleClient. > For the externalShuffleClient, there are relative external shuffle service, > which guarantees the shuffle block data and regardless the state of > executors. > For the blockTransferService, it is used to fetch broadcast block, and fetch > the shuffle data when external shuffle service is not enabled. > When fetching data by using blockTransferService, the shuffle client would > connect relative executor's blockManager, so if the relative executor is > dead, it would never fetch successfully. > When spark.shuffle.service.enabled is true and > spark.dynamicAllocation.enabled is true, the executor will be removed while > it has been idle for more than idleTimeout. > If a blockTransferService create connection to relative executor > successfully, but the relative executor is removed when beginning to fetch > broadcast block, it would retry (see RetryingBlockFetcher), which is > Ineffective. > If the spark.shuffle.io.retryWait and spark.shuffle.io.maxRetries is big, > such as 30s and 10 times, it would waste 5 minutes. > So, I think we should judge whether the relative executor is alive before > retry. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27637) If exception occured while fetching blocks by netty block transfer service, check whether the relative executor is alive before retry
[ https://issues.apache.org/jira/browse/SPARK-27637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] feiwang updated SPARK-27637: Component/s: Shuffle > If exception occured while fetching blocks by netty block transfer service, > check whether the relative executor is alive before retry > -- > > Key: SPARK-27637 > URL: https://issues.apache.org/jira/browse/SPARK-27637 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Shuffle >Affects Versions: 2.3.2, 2.4.2 >Reporter: feiwang >Priority: Major > > There are several kinds of shuffle client, blockTransferService and > externalShuffleClient. > For the externalShuffleClient, there are relative external shuffle service, > which guarantees the shuffle block data and regardless the state of > executors. > For the blockTransferService, it is used to fetch broadcast block, and fetch > the shuffle data when external shuffle service is not enabled. > When fetching data by using blockTransferService, the shuffle client would > connect relative executor's blockManager, so if the relative executor is > dead, it would never fetch successfully. > When spark.shuffle.service.enabled is true and > spark.dynamicAllocation.enabled is true, the executor will be removed while > it has been idle for more than idleTimeout. > If a blockTransferService create connection to relative executor > successfully, but the relative executor is removed when beginning to fetch > broadcast block, it would retry (see RetryingBlockFetcher), which is > Ineffective. > If the spark.shuffle.io.retryWait and spark.shuffle.io.maxRetries is big, > such as 30s and 10 times, it would waste 5 minutes. > So, I think we should judge whether the relative executor is alive before > retry. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27685) `union` doesn't promote non-nullable columns of struct to nullable
[ https://issues.apache.org/jira/browse/SPARK-27685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-27685: --- Labels: correctness (was: ) > `union` doesn't promote non-nullable columns of struct to nullable > -- > > Key: SPARK-27685 > URL: https://issues.apache.org/jira/browse/SPARK-27685 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Huon Wilson >Priority: Major > Labels: correctness > > When doing a {{union}} of two dataframes, a column that is nullable in one of > the dataframes will be nullable in the union, promoting the non-nullable one > to be nullable. > This doesn't happen properly for columns nested as subcolumns of a > {{struct}}. It seems to just take the nullability of the first dataframe in > the union, meaning a nullable column will become non-nullable, resulting in > invalid values. > {code:scala} > case class X(x: Option[Long]) > case class Nested(nested: X) > // First, just work with normal columns > val df1 = Seq(1L, 2L).toDF("x") > val df2 = Seq(Some(3L), None).toDF("x") > df1.printSchema > // root > // |-- x: long (nullable = false) > df2.printSchema > // root > // |-- x: long (nullable = true) > (df1 union df2).printSchema > // root > // |-- x: long (nullable = true) > (df1 union df2).as[X].collect > // res19: Array[X] = Array(X(Some(1)), X(Some(2)), X(Some(3)), X(None)) > (df1 union df2).select("*").show > // ++ > // | x| > // ++ > // | 1| > // | 2| > // | 3| > // |null| > // ++ > // Now, the same with the 'x' column within a struct: > val struct1 = df1.select(struct('x) as "nested") > val struct2 = df2.select(struct('x) as "nested") > struct1.printSchema > // root > // |-- nested: struct (nullable = false) > // ||-- x: long (nullable = false) > struct2.printSchema > // root > // |-- nested: struct (nullable = false) > // ||-- x: long (nullable = true) > // BAD: the x column is not nullable > (struct1 union struct2).printSchema > // root > // |-- nested: struct (nullable = false) > // ||-- x: long (nullable = false) > // BAD: the last x value became "Some(0)", instead of "None" > (struct1 union struct2).as[Nested].collect > // res23: Array[Nested] = Array(Nested(X(Some(1))), Nested(X(Some(2))), > Nested(X(Some(3))), Nested(X(Some(0 > // BAD: showing just the nested columns throws a NPE > (struct1 union struct2).select("nested.*").show > // java.lang.NullPointerException > // at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown > Source) > // at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collectFromPlan$1.apply(Dataset.scala:3387) > // ... > // at org.apache.spark.sql.Dataset.show(Dataset.scala:714) > // ... 49 elided > // Flipping the order makes x nullable as desired > (struct2 union struct1).printSchema > // root > // |-- nested: struct (nullable = false) > // ||-- x: long (nullable = true) > (struct2 union struct1).as[Y].collect > // res26: Array[Y] = Array(Y(X(Some(3))), Y(X(None)), Y(X(Some(1))), > Y(X(Some(2 > (struct2 union struct1).select("nested.*").show > // ++ > // | x| > // ++ > // | 3| > // |null| > // | 1| > // | 2| > // ++ > {code} > Note the three {{BAD}} lines, where the union of structs became non-nullable > and resulted in invalid values and exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27686) Update migration guide
[ https://issues.apache.org/jira/browse/SPARK-27686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-27686: Summary: Update migration guide (was: Update) > Update migration guide > --- > > Key: SPARK-27686 > URL: https://issues.apache.org/jira/browse/SPARK-27686 > Project: Spark > Issue Type: Sub-task > Components: Documentation, SQL >Affects Versions: 3.0.0 >Reporter: Yuming Wang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27686) Update migration guide
[ https://issues.apache.org/jira/browse/SPARK-27686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-27686: Description: The built-in Hive 2.3 fixes the following issues: * HIVE-6727: Table level stats for external tables are set incorrectly. * HIVE-15653: Some ALTER TABLE commands drop table stats. * SPARK-12014: Spark SQL query containing semicolon is broken in Beeline. * SPARK-25193: insert overwrite doesn't throw exception when drop old data fails. * SPARK-25919: Date value corrupts when tables are "ParquetHiveSerDe" formatted and target table is Partitioned. * SPARK-26332: Spark sql write orc table on viewFS throws exception. * SPARK-26437: Decimal data becomes bigint to query, unable to query. We need update > Update migration guide > --- > > Key: SPARK-27686 > URL: https://issues.apache.org/jira/browse/SPARK-27686 > Project: Spark > Issue Type: Sub-task > Components: Documentation, SQL >Affects Versions: 3.0.0 >Reporter: Yuming Wang >Priority: Major > > The built-in Hive 2.3 fixes the following issues: > * HIVE-6727: Table level stats for external tables are set incorrectly. > * HIVE-15653: Some ALTER TABLE commands drop table stats. > * SPARK-12014: Spark SQL query containing semicolon is broken in Beeline. > * SPARK-25193: insert overwrite doesn't throw exception when drop old data > fails. > * SPARK-25919: Date value corrupts when tables are "ParquetHiveSerDe" > formatted and target table is Partitioned. > * SPARK-26332: Spark sql write orc table on viewFS throws exception. > * SPARK-26437: Decimal data becomes bigint to query, unable to query. > We need update -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27686) Update migration guide
[ https://issues.apache.org/jira/browse/SPARK-27686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-27686: Description: The built-in Hive 2.3 fixes the following issues: * HIVE-6727: Table level stats for external tables are set incorrectly. * HIVE-15653: Some ALTER TABLE commands drop table stats. * SPARK-12014: Spark SQL query containing semicolon is broken in Beeline. * SPARK-25193: insert overwrite doesn't throw exception when drop old data fails. * SPARK-25919: Date value corrupts when tables are "ParquetHiveSerDe" formatted and target table is Partitioned. * SPARK-26332: Spark sql write orc table on viewFS throws exception. * SPARK-26437: Decimal data becomes bigint to query, unable to query. We need update migration guide. was: The built-in Hive 2.3 fixes the following issues: * HIVE-6727: Table level stats for external tables are set incorrectly. * HIVE-15653: Some ALTER TABLE commands drop table stats. * SPARK-12014: Spark SQL query containing semicolon is broken in Beeline. * SPARK-25193: insert overwrite doesn't throw exception when drop old data fails. * SPARK-25919: Date value corrupts when tables are "ParquetHiveSerDe" formatted and target table is Partitioned. * SPARK-26332: Spark sql write orc table on viewFS throws exception. * SPARK-26437: Decimal data becomes bigint to query, unable to query. We need update > Update migration guide > --- > > Key: SPARK-27686 > URL: https://issues.apache.org/jira/browse/SPARK-27686 > Project: Spark > Issue Type: Sub-task > Components: Documentation, SQL >Affects Versions: 3.0.0 >Reporter: Yuming Wang >Priority: Major > > The built-in Hive 2.3 fixes the following issues: > * HIVE-6727: Table level stats for external tables are set incorrectly. > * HIVE-15653: Some ALTER TABLE commands drop table stats. > * SPARK-12014: Spark SQL query containing semicolon is broken in Beeline. > * SPARK-25193: insert overwrite doesn't throw exception when drop old data > fails. > * SPARK-25919: Date value corrupts when tables are "ParquetHiveSerDe" > formatted and target table is Partitioned. > * SPARK-26332: Spark sql write orc table on viewFS throws exception. > * SPARK-26437: Decimal data becomes bigint to query, unable to query. > We need update migration guide. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27686) Update
Yuming Wang created SPARK-27686: --- Summary: Update Key: SPARK-27686 URL: https://issues.apache.org/jira/browse/SPARK-27686 Project: Spark Issue Type: Sub-task Components: Documentation, SQL Affects Versions: 3.0.0 Reporter: Yuming Wang -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27685) `union` doesn't promote non-nullable columns of struct to nullable
[ https://issues.apache.org/jira/browse/SPARK-27685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huon Wilson updated SPARK-27685: Description: When doing a {{union}} of two dataframes, a column that is nullable in one of the dataframes will be nullable in the union, promoting the non-nullable one to be nullable. This doesn't happen properly for columns nested as subcolumns of a {{struct}}. It seems to just take the nullability of the first dataframe in the union, meaning a nullable column will become non-nullable, resulting in invalid values. {code:scala} case class X(x: Option[Long]) case class Nested(nested: X) // First, just work with normal columns val df1 = Seq(1L, 2L).toDF("x") val df2 = Seq(Some(3L), None).toDF("x") df1.printSchema // root // |-- x: long (nullable = false) df2.printSchema // root // |-- x: long (nullable = true) (df1 union df2).printSchema // root // |-- x: long (nullable = true) (df1 union df2).as[X].collect // res19: Array[X] = Array(X(Some(1)), X(Some(2)), X(Some(3)), X(None)) (df1 union df2).select("*").show // ++ // | x| // ++ // | 1| // | 2| // | 3| // |null| // ++ // Now, the same with the 'x' column within a struct: val struct1 = df1.select(struct('x) as "nested") val struct2 = df2.select(struct('x) as "nested") struct1.printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = false) struct2.printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = true) // BAD: the x column is not nullable (struct1 union struct2).printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = false) // BAD: the last x value became "Some(0)", instead of "None" (struct1 union struct2).as[Nested].collect // res23: Array[Nested] = Array(Nested(X(Some(1))), Nested(X(Some(2))), Nested(X(Some(3))), Nested(X(Some(0 // BAD: showing just the nested columns throws a NPE (struct1 union struct2).select("nested.*").show // java.lang.NullPointerException // at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) // at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collectFromPlan$1.apply(Dataset.scala:3387) // ... // at org.apache.spark.sql.Dataset.show(Dataset.scala:714) // ... 49 elided // Flipping the order makes x nullable as desired (struct2 union struct1).printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = true) (struct2 union struct1).as[Y].collect // res26: Array[Y] = Array(Y(X(Some(3))), Y(X(None)), Y(X(Some(1))), Y(X(Some(2 (struct2 union struct1).select("nested.*").show // ++ // | x| // ++ // | 3| // |null| // | 1| // | 2| // ++ {code} Note the three {{BAD}} lines, where the union of structs became non-nullable and resulted in invalid values and exceptions. was: When doing a {{union}} of two dataframes, a column that is nullable in one of the dataframes will be nullable in the union, promoting the non-nullable one to be nullable. This doesn't happen properly for columns nested as subcolumns of a {{struct}}. It seems to just take the nullability of the first dataframe in the union, meaning a nullable column will become non-nullable, resulting in invalid values. {code:scala} case class X(x: Option[Long]) case class Nested(nested: X) // First, just work with normal columns val df1 = Seq(1L, 2L).toDF("x") val df2 = Seq(Some(3L), None).toDF("x") df1.printSchema // root // |-- x: long (nullable = false) df2.printSchema // root // |-- x: long (nullable = true) (df1 union df2).printSchema // root // |-- x: long (nullable = true) (df1 union df2).as[X].collect // res19: Array[X] = Array(X(Some(1)), X(Some(2)), X(Some(3)), X(None)) // Now, the same with the 'x' column within a struct: val struct1 = df1.select(struct('x) as "nested") val struct2 = df2.select(struct('x) as "nested") struct1.printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = false) struct2.printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = true) // BAD: the x column is not nullable (struct1 union struct2).printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = false) // BAD: the last x value became "Some(0)", instead of "None" (struct1 union struct2).as[Nested].collect // res23: Array[Nested] = Array(Nested(X(Some(1))), Nested(X(Some(2))), Nested(X(Some(3))), Nested(X(Some(0 // Flipping the order makes x nullable as desired (struct2 union struct1).printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = true) (struct2 union struct1).as[Y].collect // res26: Array[Y] = Array(Y(X(Some(3))), Y(X(None)), Y(X(Some(1))), Y(X(Some(2 {code} Note the two {{BAD}} lines, where the union
[jira] [Created] (SPARK-27685) `union` doesn't promote non-nullable columns of struct to nullable
Huon Wilson created SPARK-27685: --- Summary: `union` doesn't promote non-nullable columns of struct to nullable Key: SPARK-27685 URL: https://issues.apache.org/jira/browse/SPARK-27685 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Huon Wilson When doing a {{union}} of two dataframes, a column that is nullable in one of the dataframes will be nullable in the union, promoting the non-nullable one to be nullable. This doesn't happen properly for columns nested as subcolumns of a {{struct}}. It seems to just take the nullability of the first dataframe in the union, meaning a nullable column will become non-nullable, resulting in invalid values. {code:scala} case class X(x: Option[Long]) case class Nested(nested: X) // First, just work with normal columns val df1 = Seq(1L, 2L).toDF("x") val df2 = Seq(Some(3L), None).toDF("x") df1.printSchema // root // |-- x: long (nullable = false) df2.printSchema // root // |-- x: long (nullable = true) (df1 union df2).printSchema // root // |-- x: long (nullable = true) (df1 union df2).as[X].collect // res19: Array[X] = Array(X(Some(1)), X(Some(2)), X(Some(3)), X(None)) // Now, the same with the 'x' column within a struct: val struct1 = df1.select(struct('x) as "nested") val struct2 = df2.select(struct('x) as "nested") struct1.printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = false) struct2.printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = true) // BAD: the x column is not nullable (struct1 union struct2).printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = false) // BAD: the last x value became "Some(0)", instead of "None" (struct1 union struct2).as[Nested].collect // res23: Array[Nested] = Array(Nested(X(Some(1))), Nested(X(Some(2))), Nested(X(Some(3))), Nested(X(Some(0 // Flipping the order makes x nullable as desired (struct2 union struct1).printSchema // root // |-- nested: struct (nullable = false) // ||-- x: long (nullable = true) (struct2 union struct1).as[Y].collect // res26: Array[Y] = Array(Y(X(Some(3))), Y(X(None)), Y(X(Some(1))), Y(X(Some(2 {code} Note the two {{BAD}} lines, where the union of structs became non-nullable and resulted in invalid values. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27335) cannot collect() from Correlation.corr
[ https://issues.apache.org/jira/browse/SPARK-27335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838241#comment-16838241 ] Michael Chirico commented on SPARK-27335: - Also came across this in another context on Spark 2.3.1, and the suggested fix worked in our case as well. Sorry that I am unable to help narrow down the root cause... > cannot collect() from Correlation.corr > -- > > Key: SPARK-27335 > URL: https://issues.apache.org/jira/browse/SPARK-27335 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.4.0 >Reporter: Natalino Busa >Priority: Major > > reproducing the bug from the example in the documentation: > > > {code:java} > import pyspark > from pyspark.ml.linalg import Vectors > from pyspark.ml.stat import Correlation > spark = pyspark.sql.SparkSession.builder.getOrCreate() > dataset = [[Vectors.dense([1, 0, 0, -2])], > [Vectors.dense([4, 5, 0, 3])], > [Vectors.dense([6, 7, 0, 8])], > [Vectors.dense([9, 0, 0, 1])]] > dataset = spark.createDataFrame(dataset, ['features']) > df = Correlation.corr(dataset, 'features', 'pearson') > df.collect() > > {code} > This produces the following stack trace: > > {code:java} > --- > AttributeErrorTraceback (most recent call last) > in () > 11 dataset = spark.createDataFrame(dataset, ['features']) > 12 df = Correlation.corr(dataset, 'features', 'pearson') > ---> 13 df.collect() > /opt/spark/python/pyspark/sql/dataframe.py in collect(self) > 530 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] > 531 """ > --> 532 with SCCallSiteSync(self._sc) as css: > 533 sock_info = self._jdf.collectToPython() > 534 return list(_load_from_socket(sock_info, > BatchedSerializer(PickleSerializer( > /opt/spark/python/pyspark/traceback_utils.py in __enter__(self) > 70 def __enter__(self): > 71 if SCCallSiteSync._spark_stack_depth == 0: > ---> 72 self._context._jsc.setCallSite(self._call_site) > 73 SCCallSiteSync._spark_stack_depth += 1 > 74 > AttributeError: 'NoneType' object has no attribute 'setCallSite'{code} > > > Analysis: > Somehow the dataframe properties `df.sql_ctx.sparkSession._jsparkSession`, > and `spark._jsparkSession` do not match with the ones available in the spark > session. > The following code fixes the problem (I hope this helps you narrowing down > the root cause) > > {code:java} > df.sql_ctx.sparkSession._jsparkSession = spark._jsparkSession > df._sc = spark._sc > df.collect() > >>> [Row(pearson(features)=DenseMatrix(4, 4, [1.0, 0.0556, nan, 0.4005, > >>> 0.0556, 1.0, nan, 0.9136, nan, nan, 1.0, nan, 0.4005, 0.9136, nan, 1.0], > >>> False))]{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23191) Workers registration failes in case of network drop
[ https://issues.apache.org/jira/browse/SPARK-23191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838237#comment-16838237 ] wuyi commented on SPARK-23191: -- Hi [~neeraj20gupta] Can you explain more about the part of _running concurrently for one app ?_ In my understanding, if the worker shipped with drivers exited due to duplicate resgister, those drivers would also be killed. So, how does it(_running concurrently for one app_) happens ? > Workers registration failes in case of network drop > --- > > Key: SPARK-23191 > URL: https://issues.apache.org/jira/browse/SPARK-23191 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.3, 2.2.1, 2.3.0 > Environment: OS:- Centos 6.9(64 bit) > >Reporter: Neeraj Gupta >Priority: Critical > > We have a 3 node cluster. We were facing issues of multiple driver running in > some scenario in production. > On further investigation we were able to reproduce iin both 1.6.3 and 2.2.1 > versions the scenario with following steps:- > # Setup a 3 node cluster. Start master and slaves. > # On any node where the worker process is running block the connections on > port 7077 using iptables. > {code:java} > iptables -A OUTPUT -p tcp --dport 7077 -j DROP > {code} > # After about 10-15 secs we get the error on node that it is unable to > connect to master. > {code:java} > 2018-01-23 12:08:51,639 [rpc-client-1-1] WARN > org.apache.spark.network.server.TransportChannelHandler - Exception in > connection from > java.io.IOException: Connection timed out > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) > at java.lang.Thread.run(Thread.java:745) > 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR > org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting > for master to reconnect... > 2018-01-23 12:08:51,647 [dispatcher-event-loop-0] ERROR > org.apache.spark.deploy.worker.Worker - Connection to master failed! Waiting > for master to reconnect... > {code} > # Once we get this exception we renable the connections to port 7077 using > {code:java} > iptables -D OUTPUT -p tcp --dport 7077 -j DROP > {code} > # Worker tries to register again with master but is unable to do so. It > gives following error > {code:java} > 2018-01-23 12:08:58,657 [worker-register-master-threadpool-2] WARN > org.apache.spark.deploy.worker.Worker - Failed to connect to master > :7077 > org.apache.spark.SparkException: Exception thrown in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100) > at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108) > at > org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deploy$worker$Worker$$tryRegisterAllMasters$1$$anon$1.run(Worker.scala:241) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Failed to connect to :7077 > at >
[jira] [Commented] (SPARK-27684) Reduce ScalaUDF conversion overheads for primitives
[ https://issues.apache.org/jira/browse/SPARK-27684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838229#comment-16838229 ] Kazuaki Ishizaki commented on SPARK-27684: -- Interesting idea > Reduce ScalaUDF conversion overheads for primitives > --- > > Key: SPARK-27684 > URL: https://issues.apache.org/jira/browse/SPARK-27684 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Josh Rosen >Priority: Major > > I believe that we can reduce ScalaUDF overheads when operating over primitive > types. > In [ScalaUDF's > doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] > we have logic to convert UDF function input types from Catalyst internal > types to Scala types (for example, this is used to convert UTF8Strings to > Java Strings). Similarly, we convert UDF return types. > However, UDF input argument conversion is effectively a no-op for primitive > types because {{CatalystTypeConverters.createToScalaConverter()}} returns > {{identity}} in those cases. UDF result conversion is a little tricker > because {{createToCatalystConverter()}} returns [a > function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] > that handles {{Option[Primitive]}}, but it might be the case that the > Option-boxing is unusable via ScalaUDF (in which case the conversion truly is > an {{identity}} no-op). > These unnecessary no-op conversions could be quite expensive because each > call involves an index into the {{references}} array to get the converters, a > second index into the converters array to get the correct converter for the > nth input argument, and, finally, the converter invocation itself: > {code:java} > Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* > converters */)[0].apply(project_value_3);{code} > In these cases, I believe that we can reduce lookup / invocation overheads by > modifying the ScalaUDF code generation to eliminate the conversion calls for > primitives and directly assign the unconverted result, e.g. > {code:java} > Object project_arg_0 = false ? null : project_value_3;{code} > To cleanly handle the case where we have a multi-argument UDF accepting a > mixture of primitive and non-primitive types, we might be able to keep the > {{converters}} array the same size (so indexes stay the same) but omit the > invocation of the converters for the primitive arguments (e.g. {{converters}} > is sparse / contains unused entries in case of primitives). > I spotted this optimization while trying to construct some quick benchmarks > to measure UDF invocation overheads. For example: > {code:java} > spark.udf.register("identity", (x: Int) => x) > sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() > // ~ 52 seconds > sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 > * 1000 * 1000)").rdd.count() // ~84 seconds{code} > I'm curious to see whether the optimization suggested here can close this > performance gap. It'd also be a good idea to construct more principled > microbenchmarks covering multi-argument UDFs, projections involving multiple > UDFs over different input and output types, etc. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21367) R older version of Roxygen2 on Jenkins
[ https://issues.apache.org/jira/browse/SPARK-21367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838188#comment-16838188 ] Felix Cheung commented on SPARK-21367: -- great thx! > R older version of Roxygen2 on Jenkins > -- > > Key: SPARK-21367 > URL: https://issues.apache.org/jira/browse/SPARK-21367 > Project: Spark > Issue Type: Bug > Components: SparkR >Affects Versions: 2.3.0 >Reporter: Felix Cheung >Assignee: shane knapp >Priority: Major > Attachments: R.paks > > > Getting this message from a recent build. > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79461/console > Warning messages: > 1: In check_dep_version(pkg, version, compare) : > Need roxygen2 >= 5.0.0 but loaded version is 4.1.1 > 2: In check_dep_version(pkg, version, compare) : > Need roxygen2 >= 5.0.0 but loaded version is 4.1.1 > * installing *source* package 'SparkR' ... > ** R > We have been running with 5.0.1 and haven't changed for a year. > NOTE: Roxygen 6.x has some big changes and IMO we should not move to that yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27684) Reduce ScalaUDF conversion overheads for primitives
[ https://issues.apache.org/jira/browse/SPARK-27684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838187#comment-16838187 ] Felix Cheung commented on SPARK-27684: -- definitely could be interesting.. > Reduce ScalaUDF conversion overheads for primitives > --- > > Key: SPARK-27684 > URL: https://issues.apache.org/jira/browse/SPARK-27684 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Josh Rosen >Priority: Major > > I believe that we can reduce ScalaUDF overheads when operating over primitive > types. > In [ScalaUDF's > doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] > we have logic to convert UDF function input types from Catalyst internal > types to Scala types (for example, this is used to convert UTF8Strings to > Java Strings). Similarly, we convert UDF return types. > However, UDF input argument conversion is effectively a no-op for primitive > types because {{CatalystTypeConverters.createToScalaConverter()}} returns > {{identity}} in those cases. UDF result conversion is a little tricker > because {{createToCatalystConverter()}} returns [a > function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] > that handles {{Option[Primitive]}}, but it might be the case that the > Option-boxing is unusable via ScalaUDF (in which case the conversion truly is > an {{identity}} no-op). > These unnecessary no-op conversions could be quite expensive because each > call involves an index into the {{references}} array to get the converters, a > second index into the converters array to get the correct converter for the > nth input argument, and, finally, the converter invocation itself: > {code:java} > Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* > converters */)[0].apply(project_value_3);{code} > In these cases, I believe that we can reduce lookup / invocation overheads by > modifying the ScalaUDF code generation to eliminate the conversion calls for > primitives and directly assign the unconverted result, e.g. > {code:java} > Object project_arg_0 = false ? null : project_value_3;{code} > To cleanly handle the case where we have a multi-argument UDF accepting a > mixture of primitive and non-primitive types, we might be able to keep the > {{converters}} array the same size (so indexes stay the same) but omit the > invocation of the converters for the primitive arguments (e.g. {{converters}} > is sparse / contains unused entries in case of primitives). > I spotted this optimization while trying to construct some quick benchmarks > to measure UDF invocation overheads. For example: > {code:java} > spark.udf.register("identity", (x: Int) => x) > sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() > // ~ 52 seconds > sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 > * 1000 * 1000)").rdd.count() // ~84 seconds{code} > I'm curious to see whether the optimization suggested here can close this > performance gap. It'd also be a good idea to construct more principled > microbenchmarks covering multi-argument UDFs, projections involving multiple > UDFs over different input and output types, etc. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27684) Reduce ScalaUDF conversion overheads for primitives
[ https://issues.apache.org/jira/browse/SPARK-27684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-27684: --- Description: I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases. UDF result conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF (in which case the conversion truly is an {{identity}} no-op). These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for the nth input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might be able to keep the {{converters}} array the same size (so indexes stay the same) but omit the invocation of the converters for the primitive arguments (e.g. {{converters}} is sparse / contains unused entries in case of primitives). I spotted this optimization while trying to construct some quick benchmarks to measure UDF invocation overheads. For example: {code:java} spark.udf.register("identity", (x: Int) => x) sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // ~ 52 seconds sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 1000 * 1000)").rdd.count() // ~84 seconds{code} I'm curious to see whether the optimization suggested here can close this performance gap. It'd also be a good idea to construct more principled microbenchmarks covering multi-argument UDFs, projections involving multiple UDFs over different input and output types, etc. was: I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases. UDF result conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF (in which case the conversion truly is an {{identity}} no-op). These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for the nth input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we
[jira] [Updated] (SPARK-27684) Reduce ScalaUDF conversion overheads for primitives
[ https://issues.apache.org/jira/browse/SPARK-27684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-27684: --- Description: I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases. UDF result conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF (in which case the conversion truly is an {{identity}} no-op). These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for the nth input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might be able to keep the {{converters}} array the same size (so indexes stay the same) but omit the use of the converters (e.g. {{converters}} is sparse / contains unused entries in case of primitives). I spotted this optimization while trying to construct some quick benchmarks to measure UDF invocation overheads. For example: {code:java} spark.udf.register("identity", (x: Int) => x) sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // ~ 52 seconds sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 1000 * 1000)").rdd.count() // ~84 seconds{code} I'm curious to see whether the optimization suggested here can close this performance gap. It'd also be a good idea to construct more principled microbenchmarks covering multi-argument UDFs, projections involving multiple UDFs over different input and output types, etc. was: I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases. UDF result conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF (in which case the conversion truly is an {{identity}} no-op). These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for a specific input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we have a multi-argument UDF
[jira] [Updated] (SPARK-27684) Reduce ScalaUDF conversion overheads for primitives
[ https://issues.apache.org/jira/browse/SPARK-27684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-27684: --- Description: I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases. UDF result conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF (in which case the conversion truly is an {{identity}} no-op). These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for a specific input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might be able to keep the {{converters}} array the same size (so indexes stay the same) but omit the use of the converters (e.g. {{converters}} is sparse / contains unused entries in case of primitives). I spotted this optimization while trying to construct some quick benchmarks to measure UDF invocation overheads. For example: {code:java} spark.udf.register("identity", (x: Int) => x) sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // ~ 52 seconds sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 1000 * 1000)").rdd.count() // ~84 seconds{code} I'm curious to see whether the optimization suggested here can close this performance gap. It'd also be a good idea to construct more principled microbenchmarks covering multi-argument UDFs, projections involving multiple UDFs over different input and output types, etc. was: I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases. UDF result conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF. These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for a specific input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might
[jira] [Updated] (SPARK-27684) Reduce ScalaUDF conversion overheads for primitives
[ https://issues.apache.org/jira/browse/SPARK-27684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-27684: --- Description: I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases. UDF result conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF. These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for a specific input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might be able to keep the {{converters}} array the same size (so indexes stay the same) but omit the use of the converters (e.g. {{converters}} is sparse / contains unused entries in case of primitives). I spotted this optimization while trying to construct some quick benchmarks to measure UDF invocation overheads. For example: {code:java} spark.udf.register("identity", (x: Int) => x) sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // ~ 52 seconds sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 1000 * 1000)").rdd.count() // ~84 seconds{code} I'm curious to see whether the optimization suggested here can close this performance gap. It'd also be a good idea to construct more principled microbenchmarks covering multi-argument UDFs, projections involving multiple UDFs over different input and output types, etc. was: I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases). UDF return argument conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF. These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for a specific input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might be able to keep the {{converters}} array the same
[jira] [Created] (SPARK-27684) Reduce ScalaUDF conversion overheads for primitives
Josh Rosen created SPARK-27684: -- Summary: Reduce ScalaUDF conversion overheads for primitives Key: SPARK-27684 URL: https://issues.apache.org/jira/browse/SPARK-27684 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Josh Rosen I believe that we can reduce ScalaUDF overheads when operating over primitive types. In [ScalaUDF's doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991] we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types. However, UDF input argument conversion is effectively a no-op for primitive types because {{CatalystTypeConverters.createToScalaConverter()}} returns {{identity}} in those cases). UDF return argument conversion is a little tricker because {{createToCatalystConverter()}} returns [a function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413] that handles {{Option[Primitive]}}, but it might be the case that the Option-boxing is unusable via ScalaUDF. These unnecessary no-op conversions could be quite expensive because each call involves an index into the {{references}} array to get the converters, a second index into the converters array to get the correct converter for a specific input argument, and, finally, the converter invocation itself: {code:java} Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);{code} In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g. {code:java} Object project_arg_0 = false ? null : project_value_3;{code} To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might be able to keep the {{converters}} array the same size (so indexes stay the same) but omit the use of the converters (e.g. {{converters}} is sparse / contains unused entries in case of primitives). I spotted this optimization while trying to construct some quick benchmarks to measure UDF invocation overheads. For example: {code:java} spark.udf.register("identity", (x: Int) => x) sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // ~ 52 seconds sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 1000 * 1000)").rdd.count() // ~84 seconds{code} I'm curious to see whether the optimization suggested here can close this performance gap. It'd also be a good idea to construct more principled microbenchmarks covering multi-argument UDFs, projections involving multiple UDFs over different input and output types, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27343) Use ConfigEntry for hardcoded configs for spark-sql-kafka
[ https://issues.apache.org/jira/browse/SPARK-27343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-27343. --- Resolution: Fixed Fix Version/s: 3.1.0 Issue resolved by pull request 24574 [https://github.com/apache/spark/pull/24574] > Use ConfigEntry for hardcoded configs for spark-sql-kafka > - > > Key: SPARK-27343 > URL: https://issues.apache.org/jira/browse/SPARK-27343 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: hehuiyuan >Assignee: hehuiyuan >Priority: Minor > Fix For: 3.1.0 > > > Extracting parameters , building the objects of ConfigEntry -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27343) Use ConfigEntry for hardcoded configs for spark-sql-kafka
[ https://issues.apache.org/jira/browse/SPARK-27343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-27343: -- Fix Version/s: (was: 3.1.0) 3.0.0 > Use ConfigEntry for hardcoded configs for spark-sql-kafka > - > > Key: SPARK-27343 > URL: https://issues.apache.org/jira/browse/SPARK-27343 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: hehuiyuan >Assignee: hehuiyuan >Priority: Minor > Fix For: 3.0.0 > > > Extracting parameters , building the objects of ConfigEntry -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27343) Use ConfigEntry for hardcoded configs for spark-sql-kafka
[ https://issues.apache.org/jira/browse/SPARK-27343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-27343: - Assignee: hehuiyuan > Use ConfigEntry for hardcoded configs for spark-sql-kafka > - > > Key: SPARK-27343 > URL: https://issues.apache.org/jira/browse/SPARK-27343 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: hehuiyuan >Assignee: hehuiyuan >Priority: Minor > > Extracting parameters , building the objects of ConfigEntry -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27683) Remove usage of TraversableOnce
[ https://issues.apache.org/jira/browse/SPARK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838092#comment-16838092 ] Sean Owen commented on SPARK-27683: --- Yeah I mentioned that above -- I don't think it helps as it still leaves you without a way to produce a version that can cross compile to 2.12 and 2.13. The import is different, and the resulting binary signature is different. > Remove usage of TraversableOnce > --- > > Key: SPARK-27683 > URL: https://issues.apache.org/jira/browse/SPARK-27683 > Project: Spark > Issue Type: Sub-task > Components: ML, Spark Core, SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Sean Owen >Assignee: Sean Owen >Priority: Major > > As with {{Traversable}}, {{TraversableOnce}} is going away in Scala 2.13. We > should use {{IterableOnce}} instead. This one is a bigger change as there are > more API methods with the existing signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27074) Hive 3.1 metastore support HiveClientImpl.runHive
[ https://issues.apache.org/jira/browse/SPARK-27074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-27074: Summary: Hive 3.1 metastore support HiveClientImpl.runHive (was: Refactor HiveClientImpl runHive) > Hive 3.1 metastore support HiveClientImpl.runHive > - > > Key: SPARK-27074 > URL: https://issues.apache.org/jira/browse/SPARK-27074 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Yuming Wang >Priority: Major > > Hive 3.1.1's {{CommandProcessor}} have 2 changes: > # HIVE-17626(Hive 3.0.0) add ReExecDriver. So the current code path is: > https://github.com/apache/spark/blob/02bbe977abaf7006b845a7e99d612b0235aa0025/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L736-L742. > This is incorrect. > # HIVE-18238(Hive 3.0.0) changed the {{Driver.close()}} function return > type. This change is not compatible with the built-in Hive. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-27683) Remove usage of TraversableOnce
[ https://issues.apache.org/jira/browse/SPARK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838086#comment-16838086 ] PJ Fanning edited comment on SPARK-27683 at 5/12/19 3:01 PM: - [~srowen] would it be possible to use the scala-collection-compat lib? It has a type alias `IterableOnce` that maps to `TraversableOnce` in the scala 2.11 and 2.12 versions of the lib but to the core IterableOnce in 2.13. [https://github.com/scala/scala-collection-compat/blob/master/compat/src/main/scala-2.11_2.12/scala/collection/compat/PackageShared.scala#L156] [https://github.com/scala/scala-collection-compat/blob/master/compat/src/main/scala-2.13/scala/collection/compat/package.scala#L22] The akka team created equivalent type aliases to avoid the dependency on scala-collection-compat and this approach could be used to add additional type aliases that suit Spark's requirements. was (Author: pj.fanning): [~srowen] would it be possible to use the scala-collection-compat lib? It has a type alias `IterableOnce` that maps to `TraversableOnce` in the scala 2.11 and 2.12 versions of the lib but to the core IterableOnce in 2.13. [https://github.com/scala/scala-collection-compat/blob/master/compat/src/main/scala-2.11_2.12/scala/collection/compat/PackageShared.scala#L156] [https://github.com/scala/scala-collection-compat/blob/master/compat/src/main/scala-2.13/scala/collection/compat/package.scala#L22] > Remove usage of TraversableOnce > --- > > Key: SPARK-27683 > URL: https://issues.apache.org/jira/browse/SPARK-27683 > Project: Spark > Issue Type: Sub-task > Components: ML, Spark Core, SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Sean Owen >Assignee: Sean Owen >Priority: Major > > As with {{Traversable}}, {{TraversableOnce}} is going away in Scala 2.13. We > should use {{IterableOnce}} instead. This one is a bigger change as there are > more API methods with the existing signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27683) Remove usage of TraversableOnce
[ https://issues.apache.org/jira/browse/SPARK-27683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838086#comment-16838086 ] PJ Fanning commented on SPARK-27683: [~srowen] would it be possible to use the scala-collection-compat lib? It has a type alias `IterableOnce` that maps to `TraversableOnce` in the scala 2.11 and 2.12 versions of the lib but to the core IterableOnce in 2.13. [https://github.com/scala/scala-collection-compat/blob/master/compat/src/main/scala-2.11_2.12/scala/collection/compat/PackageShared.scala#L156] [https://github.com/scala/scala-collection-compat/blob/master/compat/src/main/scala-2.13/scala/collection/compat/package.scala#L22] > Remove usage of TraversableOnce > --- > > Key: SPARK-27683 > URL: https://issues.apache.org/jira/browse/SPARK-27683 > Project: Spark > Issue Type: Sub-task > Components: ML, Spark Core, SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Sean Owen >Assignee: Sean Owen >Priority: Major > > As with {{Traversable}}, {{TraversableOnce}} is going away in Scala 2.13. We > should use {{IterableOnce}} instead. This one is a bigger change as there are > more API methods with the existing signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-27675) do not use MutableColumnarRow in ColumnarBatch
[ https://issues.apache.org/jira/browse/SPARK-27675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-27675. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 24581 [https://github.com/apache/spark/pull/24581] > do not use MutableColumnarRow in ColumnarBatch > -- > > Key: SPARK-27675 > URL: https://issues.apache.org/jira/browse/SPARK-27675 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org