[spark] branch master updated (7f6a8ab -> eeb8120)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7f6a8ab [SPARK-31777][ML][PYSPARK] Add user-specified fold column to CrossValidator add eeb8120 [SPARK-31337][SQL] Support MS SQL Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 1 - .../sql/jdbc/MsSqlServerIntegrationSuite.scala | 2 +- pom.xml| 6 ++ sql/core/pom.xml | 5 ++ .../jdbc/connection/ConnectionProvider.scala | 4 + .../jdbc/connection/MSSQLConnectionProvider.scala | 97 ++ .../connection/MariaDBConnectionProvider.scala | 2 +- .../connection/MSSQLConnectionProviderSuite.scala | 51 8 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7f6a8ab -> eeb8120)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7f6a8ab [SPARK-31777][ML][PYSPARK] Add user-specified fold column to CrossValidator add eeb8120 [SPARK-31337][SQL] Support MS SQL Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 1 - .../sql/jdbc/MsSqlServerIntegrationSuite.scala | 2 +- pom.xml| 6 ++ sql/core/pom.xml | 5 ++ .../jdbc/connection/ConnectionProvider.scala | 4 + .../jdbc/connection/MSSQLConnectionProvider.scala | 97 ++ .../connection/MariaDBConnectionProvider.scala | 2 +- .../connection/MSSQLConnectionProviderSuite.scala | 51 8 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7f6a8ab -> eeb8120)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7f6a8ab [SPARK-31777][ML][PYSPARK] Add user-specified fold column to CrossValidator add eeb8120 [SPARK-31337][SQL] Support MS SQL Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 1 - .../sql/jdbc/MsSqlServerIntegrationSuite.scala | 2 +- pom.xml| 6 ++ sql/core/pom.xml | 5 ++ .../jdbc/connection/ConnectionProvider.scala | 4 + .../jdbc/connection/MSSQLConnectionProvider.scala | 97 ++ .../connection/MariaDBConnectionProvider.scala | 2 +- .../connection/MSSQLConnectionProviderSuite.scala | 51 8 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7f6a8ab -> eeb8120)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7f6a8ab [SPARK-31777][ML][PYSPARK] Add user-specified fold column to CrossValidator add eeb8120 [SPARK-31337][SQL] Support MS SQL Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 1 - .../sql/jdbc/MsSqlServerIntegrationSuite.scala | 2 +- pom.xml| 6 ++ sql/core/pom.xml | 5 ++ .../jdbc/connection/ConnectionProvider.scala | 4 + .../jdbc/connection/MSSQLConnectionProvider.scala | 97 ++ .../connection/MariaDBConnectionProvider.scala | 2 +- .../connection/MSSQLConnectionProviderSuite.scala | 51 8 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7f6a8ab -> eeb8120)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7f6a8ab [SPARK-31777][ML][PYSPARK] Add user-specified fold column to CrossValidator add eeb8120 [SPARK-31337][SQL] Support MS SQL Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 1 - .../sql/jdbc/MsSqlServerIntegrationSuite.scala | 2 +- pom.xml| 6 ++ sql/core/pom.xml | 5 ++ .../jdbc/connection/ConnectionProvider.scala | 4 + .../jdbc/connection/MSSQLConnectionProvider.scala | 97 ++ .../connection/MariaDBConnectionProvider.scala | 2 +- .../connection/MSSQLConnectionProviderSuite.scala | 51 8 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProvider.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MSSQLConnectionProviderSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9469831 [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available 9469831 is described below commit 9469831c3751e898ebe78cb642266b50ea167f22 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Mon May 11 17:25:41 2020 -0700 [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available ### What changes were proposed in this pull request? This patch re-obtain tokens at the start of AM for yarn cluster mode, if principal and keytab are available. It basically transfers the credentials from the original user, so this patch puts the new tokens into credentials from the original user via overwriting. To obtain tokens from providers in user application, this patch leverages the user classloader as context classloader while initializing token manager in the startup of AM. ### Why are the changes needed? Submitter will obtain delegation tokens for yarn-cluster mode, and add these credentials to the launch context. AM will be launched with these credentials, and AM and driver are able to leverage these tokens. In Yarn cluster mode, driver is launched in AM, which in turn initializes token manager (while initializing SparkContext) and obtain delegation tokens (+ schedule to renew) if both principal and keytab are available. That said, even we provide principal and keytab to run application with yarn-cluster mode, AM always starts with initial tokens from launch context until token manager runs and obtains delegation tokens. So there's a "gap", and if user codes (driver) access to external system with delegation tokens (e.g. HDFS) before initializing SparkContext, it cannot leverage the tokens token manager will obtain. It will make the application fail if AM is killed "after" the initial tokens are expired and relaunched. This is even a regression: see below codes in branch-2.4: https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala In Spark 2.4.x, AM runs AMCredentialRenewer at initialization, and AMCredentialRenew obtains tokens and merge with credentials being provided with launch context of AM. So it guarantees new tokens in driver run. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually tested with specifically crafted application (simple reproducer) - https://github.com/HeartSaVioR/spark-delegation-token-experiment/blob/master/src/main/scala/net/heartsavior/spark/example/LongRunningAppWithHDFSConfig.scala Before this patch, new AM attempt failed when I killed AM after the expiration of tokens. After this patch the new AM attempt runs fine. Closes #28336 from HeartSaVioR/SPARK-31559. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin (cherry picked from commit 842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a) Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1e8f408..862acd8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -860,10 +861,22 @@ object ApplicationMaster extends Logging { val ugi = sparkConf.get(PRINCIPAL) match { // We only need to log in with the keytab in cluster mode. In client mode, the driver // handles the user keytab. - case Some(principal) if amArgs.userClass != null => + case Some(principal) if master.isClusterMode => val originalCreds = UserGroupInformation.
[spark] branch master updated (64fb358 -> 842b1dc)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 64fb358 [SPARK-31671][ML] Wrong error message in VectorAssembler add 842b1dc [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available No new revisions were added by this update. Summary of changes: .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9469831 [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available 9469831 is described below commit 9469831c3751e898ebe78cb642266b50ea167f22 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Mon May 11 17:25:41 2020 -0700 [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available ### What changes were proposed in this pull request? This patch re-obtain tokens at the start of AM for yarn cluster mode, if principal and keytab are available. It basically transfers the credentials from the original user, so this patch puts the new tokens into credentials from the original user via overwriting. To obtain tokens from providers in user application, this patch leverages the user classloader as context classloader while initializing token manager in the startup of AM. ### Why are the changes needed? Submitter will obtain delegation tokens for yarn-cluster mode, and add these credentials to the launch context. AM will be launched with these credentials, and AM and driver are able to leverage these tokens. In Yarn cluster mode, driver is launched in AM, which in turn initializes token manager (while initializing SparkContext) and obtain delegation tokens (+ schedule to renew) if both principal and keytab are available. That said, even we provide principal and keytab to run application with yarn-cluster mode, AM always starts with initial tokens from launch context until token manager runs and obtains delegation tokens. So there's a "gap", and if user codes (driver) access to external system with delegation tokens (e.g. HDFS) before initializing SparkContext, it cannot leverage the tokens token manager will obtain. It will make the application fail if AM is killed "after" the initial tokens are expired and relaunched. This is even a regression: see below codes in branch-2.4: https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala In Spark 2.4.x, AM runs AMCredentialRenewer at initialization, and AMCredentialRenew obtains tokens and merge with credentials being provided with launch context of AM. So it guarantees new tokens in driver run. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually tested with specifically crafted application (simple reproducer) - https://github.com/HeartSaVioR/spark-delegation-token-experiment/blob/master/src/main/scala/net/heartsavior/spark/example/LongRunningAppWithHDFSConfig.scala Before this patch, new AM attempt failed when I killed AM after the expiration of tokens. After this patch the new AM attempt runs fine. Closes #28336 from HeartSaVioR/SPARK-31559. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin (cherry picked from commit 842b1dcdff0ecab4af9f292c2ff7b2b9ae1ac40a) Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1e8f408..862acd8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -860,10 +861,22 @@ object ApplicationMaster extends Logging { val ugi = sparkConf.get(PRINCIPAL) match { // We only need to log in with the keytab in cluster mode. In client mode, the driver // handles the user keytab. - case Some(principal) if amArgs.userClass != null => + case Some(principal) if master.isClusterMode => val originalCreds = UserGroupInformation.
[spark] branch master updated (64fb358 -> 842b1dc)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 64fb358 [SPARK-31671][ML] Wrong error message in VectorAssembler add 842b1dc [SPARK-31559][YARN] Re-obtain tokens at the startup of AM for yarn cluster mode if principal and keytab are available No new revisions were added by this update. Summary of changes: .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (54b97b2 -> c619990)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 54b97b2 [MINOR][DOCS] Fix a typo in ContainerPlacementStrategy's class comment add c619990 [SPARK-31272][SQL] Support DB2 Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 13 ++-- .../src/test/resources/db2_krb_setup.sh| 23 ++ .../spark/sql/jdbc/DB2IntegrationSuite.scala | 1 - .../spark/sql/jdbc/DB2KrbIntegrationSuite.scala| 89 ++ .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 28 +-- .../sql/jdbc/DockerKrbJDBCIntegrationSuite.scala | 4 +- .../sql/jdbc/MariaDBKrbIntegrationSuite.scala | 1 - .../sql/jdbc/PostgresKrbIntegrationSuite.scala | 1 - pom.xml| 6 ++ sql/core/pom.xml | 5 ++ .../jdbc/connection/BasicConnectionProvider.scala | 8 +- .../jdbc/connection/ConnectionProvider.scala | 10 +++ ...nProvider.scala => DB2ConnectionProvider.scala} | 43 +++ .../connection/MariaDBConnectionProvider.scala | 11 +-- .../connection/PostgresConnectionProvider.scala| 9 +-- .../jdbc/connection/SecureConnectionProvider.scala | 12 +++ ...uite.scala => DB2ConnectionProviderSuite.scala} | 6 +- 17 files changed, 204 insertions(+), 66 deletions(-) copy dev/sbt-checkstyle => external/docker-integration-tests/src/test/resources/db2_krb_setup.sh (58%) create mode 100644 external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/{PostgresConnectionProvider.scala => DB2ConnectionProvider.scala} (51%) copy sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/{MariaDBConnectionProviderSuite.scala => DB2ConnectionProviderSuite.scala} (80%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (54b97b2 -> c619990)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 54b97b2 [MINOR][DOCS] Fix a typo in ContainerPlacementStrategy's class comment add c619990 [SPARK-31272][SQL] Support DB2 Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 13 ++-- .../src/test/resources/db2_krb_setup.sh| 23 ++ .../spark/sql/jdbc/DB2IntegrationSuite.scala | 1 - .../spark/sql/jdbc/DB2KrbIntegrationSuite.scala| 89 ++ .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 28 +-- .../sql/jdbc/DockerKrbJDBCIntegrationSuite.scala | 4 +- .../sql/jdbc/MariaDBKrbIntegrationSuite.scala | 1 - .../sql/jdbc/PostgresKrbIntegrationSuite.scala | 1 - pom.xml| 6 ++ sql/core/pom.xml | 5 ++ .../jdbc/connection/BasicConnectionProvider.scala | 8 +- .../jdbc/connection/ConnectionProvider.scala | 10 +++ ...nProvider.scala => DB2ConnectionProvider.scala} | 43 +++ .../connection/MariaDBConnectionProvider.scala | 11 +-- .../connection/PostgresConnectionProvider.scala| 9 +-- .../jdbc/connection/SecureConnectionProvider.scala | 12 +++ ...uite.scala => DB2ConnectionProviderSuite.scala} | 6 +- 17 files changed, 204 insertions(+), 66 deletions(-) copy dev/sbt-checkstyle => external/docker-integration-tests/src/test/resources/db2_krb_setup.sh (58%) create mode 100644 external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2KrbIntegrationSuite.scala copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/{PostgresConnectionProvider.scala => DB2ConnectionProvider.scala} (51%) copy sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/{MariaDBConnectionProviderSuite.scala => DB2ConnectionProviderSuite.scala} (80%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (014d335 -> 1354d2d)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 014d335 [SPARK-31291][SQL][TEST] SQLQueryTestSuite: Sharing test data and test tables among multiple test cases add 1354d2d [SPARK-31021][SQL] Support MariaDB Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 4 +- .../test/resources/mariadb_docker_entrypoint.sh| 20 ++ .../src/test/resources/mariadb_krb_setup.sh| 6 +- .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 22 +-- .../sql/jdbc/DockerKrbJDBCIntegrationSuite.scala | 75 +++-- .../sql/jdbc/MariaDBKrbIntegrationSuite.scala | 67 +++ .../sql/jdbc/MsSqlServerIntegrationSuite.scala | 2 - .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 1 - .../spark/sql/jdbc/OracleIntegrationSuite.scala| 1 - .../spark/sql/jdbc/PostgresIntegrationSuite.scala | 1 - .../sql/jdbc/PostgresKrbIntegrationSuite.scala | 76 ++ pom.xml| 6 ++ sql/core/pom.xml | 4 +- .../jdbc/connection/ConnectionProvider.scala | 7 ++ .../connection/MariaDBConnectionProvider.scala | 54 +++ .../connection/PostgresConnectionProvider.scala| 50 +++--- .../jdbc/connection/SecureConnectionProvider.scala | 75 + .../connection/ConnectionProviderSuiteBase.scala | 69 .../MariaDBConnectionProviderSuite.scala} | 12 ++-- .../PostgresConnectionProviderSuite.scala | 61 ++--- 20 files changed, 399 insertions(+), 214 deletions(-) copy bin/beeline => external/docker-integration-tests/src/test/resources/mariadb_docker_entrypoint.sh (67%) copy dev/scalafmt => external/docker-integration-tests/src/test/resources/mariadb_krb_setup.sh (77%) create mode 100644 external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala copy sql/core/src/{main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala => test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala} (70%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (014d335 -> 1354d2d)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 014d335 [SPARK-31291][SQL][TEST] SQLQueryTestSuite: Sharing test data and test tables among multiple test cases add 1354d2d [SPARK-31021][SQL] Support MariaDB Kerberos login in JDBC connector No new revisions were added by this update. Summary of changes: external/docker-integration-tests/pom.xml | 4 +- .../test/resources/mariadb_docker_entrypoint.sh| 20 ++ .../src/test/resources/mariadb_krb_setup.sh| 6 +- .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 22 +-- .../sql/jdbc/DockerKrbJDBCIntegrationSuite.scala | 75 +++-- .../sql/jdbc/MariaDBKrbIntegrationSuite.scala | 67 +++ .../sql/jdbc/MsSqlServerIntegrationSuite.scala | 2 - .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 1 - .../spark/sql/jdbc/OracleIntegrationSuite.scala| 1 - .../spark/sql/jdbc/PostgresIntegrationSuite.scala | 1 - .../sql/jdbc/PostgresKrbIntegrationSuite.scala | 76 ++ pom.xml| 6 ++ sql/core/pom.xml | 4 +- .../jdbc/connection/ConnectionProvider.scala | 7 ++ .../connection/MariaDBConnectionProvider.scala | 54 +++ .../connection/PostgresConnectionProvider.scala| 50 +++--- .../jdbc/connection/SecureConnectionProvider.scala | 75 + .../connection/ConnectionProviderSuiteBase.scala | 69 .../MariaDBConnectionProviderSuite.scala} | 12 ++-- .../PostgresConnectionProviderSuite.scala | 61 ++--- 20 files changed, 399 insertions(+), 214 deletions(-) copy bin/beeline => external/docker-integration-tests/src/test/resources/mariadb_docker_entrypoint.sh (67%) copy dev/scalafmt => external/docker-integration-tests/src/test/resources/mariadb_krb_setup.sh (77%) create mode 100644 external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProvider.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/SecureConnectionProvider.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuiteBase.scala copy sql/core/src/{main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala => test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/MariaDBConnectionProviderSuite.scala} (70%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-30874][SQL] Support Postgres Kerberos login in JDBC connector
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 231e650 [SPARK-30874][SQL] Support Postgres Kerberos login in JDBC connector 231e650 is described below commit 231e65092fa97516e30c4ef12e635bfe3e97c7f0 Author: Gabor Somogyi AuthorDate: Thu Mar 12 19:04:35 2020 -0700 [SPARK-30874][SQL] Support Postgres Kerberos login in JDBC connector ### What changes were proposed in this pull request? When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it. This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in enterprise environment where exposing simple authentication access is not an option due to IT policy issues. In this PR I've added Postgres support (other supported databases will come in later PRs). What this PR contains: * Added `keytab` and `principal` JDBC options * Added `ConnectionProvider` trait and it's impementations: * `BasicConnectionProvider` => unsecure connection * `PostgresConnectionProvider` => postgres secure connection * Added `ConnectionProvider` tests * Added `PostgresKrbIntegrationSuite` docker integration test * Created `SecurityUtils` to concentrate re-usable security related functionalities * Documentation ### Why are the changes needed? Missing JDBC kerberos support. ### Does this PR introduce any user-facing change? Yes, 2 additional JDBC options added: * keytab * principal If both provided then Spark does kerberos authentication. ### How was this patch tested? To demonstrate the functionality with a standalone application I've created this repository: https://github.com/gaborgsomogyi/docker-kerberos * Additional + existing unit tests * Additional docker integration test * Test on cluster manually * `SKIP_API=1 jekyll build` Closes #27637 from gaborgsomogyi/SPARK-30874. Authored-by: Gabor Somogyi Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/util/SecurityUtils.scala | 69 +++ docs/sql-data-sources-jdbc.md | 14 +++ external/docker-integration-tests/pom.xml | 5 + .../src/test/resources/log4j.properties| 36 ++ .../src/test/resources/postgres_krb_setup.sh | 21 .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 15 ++- .../sql/jdbc/DockerKrbJDBCIntegrationSuite.scala | 94 +++ .../sql/jdbc/PostgresKrbIntegrationSuite.scala | 129 + .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 30 + .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 32 + .../execution/datasources/jdbc/JDBCOptions.scala | 25 +++- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 3 +- .../jdbc/connection/BasicConnectionProvider.scala | 29 + .../jdbc/connection/ConnectionProvider.scala | 52 + .../connection/PostgresConnectionProvider.scala| 82 + .../PostgresConnectionProviderSuite.scala | 85 ++ 16 files changed, 663 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SecurityUtils.scala b/core/src/main/scala/org/apache/spark/util/SecurityUtils.scala new file mode 100644 index 000..7831f66 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SecurityUtils.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +/** + * Various utility methods used by Spark Security. + */ +private[spark] object SecurityUtils { + private val JAVA_VENDOR = "java.vendor" + private val IBM_KRB_DEBUG_CONFIG = "com.ibm.security.krb5.Krb5Debug" + private val SUN_KRB_DEBUG_CONFIG = "sun.security.krb5.debug" + + def setG
[spark] branch story/PLAT-2902/create-spark-gr-with-ci updated: Bootstrapping CircleCI.
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch story/PLAT-2902/create-spark-gr-with-ci in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/story/PLAT-2902/create-spark-gr-with-ci by this push: new bb81672 Bootstrapping CircleCI. bb81672 is described below commit bb81672a7eccd3419c162792b276eacd1fc366ed Author: Marcelo Vanzin AuthorDate: Wed Feb 19 16:39:12 2020 -0800 Bootstrapping CircleCI. --- config.yml | 39 +++ gr/build.sh | 12 2 files changed, 51 insertions(+) diff --git a/config.yml b/config.yml new file mode 100644 index 000..360ad5b --- /dev/null +++ b/config.yml @@ -0,0 +1,39 @@ +version: 2.1 + +orbs: + aws-ecr: circleci/aws-ecr@6.5.0 + +jobs: + build: +machine: + image: ubuntu-1604:201903-01 +steps: + - checkout + - restore_cache: + key: gr-spark-mvn-cache + paths: +- ~/.m2 + - run: + name: "Build Spark" + command: gr/build.sh + - save_cache: + key: gr-spark-mvn-cache + paths: +- ~/.m2 + +# workflows: +# build_and_push: +# jobs: +# - run_tests +# - aws-ecr/build-and-push-image: +# account-url: ECR_ENDPOINT +# aws-access-key-id: AWS_ACCESS_KEY_ID +# aws-secret-access-key: AWS_SECRET_ACCESS_KEY +# create-repo: false +# dockerfile: Dockerfile +# profile-name: circleci +# region: AWS_DEFAULT_REGION +# repo: protean-operator +# tag: ${CIRCLE_SHA1} +# requires: +# - run_tests diff --git a/gr/build.sh b/gr/build.sh new file mode 100755 index 000..700165e --- /dev/null +++ b/gr/build.sh @@ -0,0 +1,12 @@ +#!/bin/sh +# +# Build Spark with all the needed options for GR. +# + +cd $(dirname $0)/.. +./dev/make-distribution.sh \ + --tgz \ + -Pkubernetes\ + -Phive \ + -Phadoop-cloud \ + -Dhadoop.version=2.10.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/01: Package AWS SDK.
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch story/PLAT-2902/create-spark-gr-with-ci in repository https://gitbox.apache.org/repos/asf/spark.git commit ff7ba54865a35c5fe1f8de92b4e52202931598e1 Author: Marcelo Vanzin AuthorDate: Thu Feb 13 16:14:09 2020 -0800 Package AWS SDK. --- hadoop-cloud/pom.xml | 5 - 1 file changed, 5 deletions(-) diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index 42941b9..b3e 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -102,11 +102,6 @@ com.fasterxml.jackson.core jackson-annotations - - - com.amazonaws - aws-java-sdk - - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch story/PLAT-2902/create-spark-gr-with-ci created (now ff7ba54)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch story/PLAT-2902/create-spark-gr-with-ci in repository https://gitbox.apache.org/repos/asf/spark.git. at ff7ba54 Package AWS SDK. This branch includes the following new commits: new ff7ba54 Package AWS SDK. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (580c2b7 -> a2fe73b)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 580c2b7 [SPARK-27166][SQL][FOLLOWUP] Refactor to build string once add a2fe73b [SPARK-30481][CORE] Integrate event log compactor into Spark History Server No new revisions were added by this update. Summary of changes: .../deploy/history/EventLogFileCompactor.scala | 9 +- .../spark/deploy/history/FsHistoryProvider.scala | 173 +++-- .../org/apache/spark/internal/config/History.scala | 16 ++ .../org/apache/spark/internal/config/package.scala | 18 --- .../history/EventLogFileCompactorSuite.scala | 49 +++--- .../deploy/history/FsHistoryProviderSuite.scala| 126 +-- docs/monitoring.md | 21 ++- 7 files changed, 313 insertions(+), 99 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (580c2b7 -> a2fe73b)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 580c2b7 [SPARK-27166][SQL][FOLLOWUP] Refactor to build string once add a2fe73b [SPARK-30481][CORE] Integrate event log compactor into Spark History Server No new revisions were added by this update. Summary of changes: .../deploy/history/EventLogFileCompactor.scala | 9 +- .../spark/deploy/history/FsHistoryProvider.scala | 173 +++-- .../org/apache/spark/internal/config/History.scala | 16 ++ .../org/apache/spark/internal/config/package.scala | 18 --- .../history/EventLogFileCompactorSuite.scala | 49 +++--- .../deploy/history/FsHistoryProviderSuite.scala| 126 +-- docs/monitoring.md | 21 ++- 7 files changed, 313 insertions(+), 99 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-29876][SS] Delete/archive file source completed files in separate thread
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new abf759a [SPARK-29876][SS] Delete/archive file source completed files in separate thread abf759a is described below commit abf759a91e01497586b8bb6b7a314dd28fd6cff1 Author: Gabor Somogyi AuthorDate: Fri Jan 17 10:45:36 2020 -0800 [SPARK-29876][SS] Delete/archive file source completed files in separate thread ### What changes were proposed in this pull request? [SPARK-20568](https://issues.apache.org/jira/browse/SPARK-20568) added the possibility to clean up completed files in streaming query. Deleting/archiving uses the main thread which can slow down processing. In this PR I've created thread pool to handle file delete/archival. The number of threads can be configured with `spark.sql.streaming.fileSource.cleaner.numThreads`. ### Why are the changes needed? Do file delete/archival in separate thread. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests. Closes #26502 from gaborgsomogyi/SPARK-29876. Authored-by: Gabor Somogyi Signed-off-by: Marcelo Vanzin --- docs/structured-streaming-programming-guide.md | 5 +-- .../org/apache/spark/sql/internal/SQLConf.scala| 6 .../sql/execution/streaming/FileStreamSource.scala | 40 +++--- .../sql/streaming/FileStreamSourceSuite.scala | 9 +++-- 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 306d688..429d456 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -551,9 +551,10 @@ Here are the details of all the sources in Spark. When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files. For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match. Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt. -NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation. +NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation. +Number of threads used in completed file cleaner can be configured withspark.sql.streaming.fileSource.cleaner.numThreads (default: 1). NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink. -NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. +NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up. For file-format-specific options, see the related methods in DataStreamReader (Scala/Java/Python/ 0) { +logDebug(s"Cleaning file source on $numThreads separate thread(s)") + Some(ThreadUtils.newDaemonCachedThreadPool("file-source-c
[spark] branch master updated (fd308ad -> 830e635)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fd308ad [SPARK-30041][SQL][WEBUI] Add Codegen Stage Id to Stage DAG visualization in Web UI add 830e635 [SPARK-27868][CORE][FOLLOWUP] Recover the default value to -1 again No new revisions were added by this update. Summary of changes: .../main/java/org/apache/spark/network/util/TransportConf.java| 8 ++-- docs/configuration.md | 5 +++-- 2 files changed, 9 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dca8380 [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation dca8380 is described below commit dca838058ffd0e2c01591fd9ab0f192de446d606 Author: Marcelo Vanzin AuthorDate: Thu Jan 16 13:37:11 2020 -0800 [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation The issue here is that when Spark is downscaling the application and deletes a few pod requests that aren't needed anymore, it may actually race with the K8S scheduler, who may be bringing up those executors. So they may have enough time to connect back to the driver, register, to just be deleted soon after. This wastes resources and causes misleading entries in the driver log. The change (ab)uses the blacklisting mechanism to consider the deleted excess pods as blacklisted, so that if they try to connect back, the driver will deny it. It also changes the executor registration slightly, since even with the above change there were misleading logs. That was because the executor registration message was an RPC that always succeeded (bar network issues), so the executor would always try to send an unregistration message to the driver, which would then log several messages about not knowing anything about the executor. The change makes the registration RPC succeed or fail directly, instead of using the separate failure message that would lead to this issue. Note the last change required some changes in a standalone test suite related to dynamic allocation, since it relied on the driver not throwing exceptions when a duplicate executor registration happened. Tested with existing unit tests, and with live cluster with dyn alloc on. Closes #26586 from vanzin/SPARK-29950. Authored-by: Marcelo Vanzin Signed-off-by: Marcelo Vanzin --- .../executor/CoarseGrainedExecutorBackend.scala| 14 +++-- .../cluster/CoarseGrainedClusterMessage.scala | 7 --- .../cluster/CoarseGrainedSchedulerBackend.scala| 19 +-- .../deploy/StandaloneDynamicAllocationSuite.scala | 65 ++ .../CoarseGrainedSchedulerBackendSuite.scala | 1 + .../cluster/k8s/ExecutorPodsAllocator.scala| 18 ++ .../k8s/KubernetesClusterSchedulerBackend.scala| 4 ++ .../DeterministicExecutorPodsSnapshotsStore.scala | 9 +++ .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 11 9 files changed, 105 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b1837c9..1fe901a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -54,6 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend( resourcesFileOpt: Option[String]) extends IsolatedRpcEndpoint with ExecutorBackend with Logging { + import CoarseGrainedExecutorBackend._ + private implicit val formats = DefaultFormats private[this] val stopping = new AtomicBoolean(false) @@ -80,9 +82,8 @@ private[spark] class CoarseGrainedExecutorBackend( ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, extractAttributes, resources)) }(ThreadUtils.sameThread).onComplete { - // This is a very fast action so we can use "ThreadUtils.sameThread" - case Success(msg) => -// Always receive `true`. Just ignore it + case Success(_) => +self.send(RegisteredExecutor) case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) @@ -133,9 +134,6 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } -case RegisterExecutorFailed(message) => - exitExecutor(1, "Slave registration failed: " + message) - case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") @@ -226,6 +224,10 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend extends Logging { + // Message used internally to start the executor when the driver successfully accepted the + // registration request. + case object RegisteredExecutor + case class Arguments( driverUrl: String, executorId: String
[spark] branch master updated (6c178a5 -> d42cf45)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6c178a5 [SPARK-30495][SS] Consider spark.security.credentials.kafka.enabled and cluster configuration when checking latest delegation token add d42cf45 [SPARK-30246][CORE] OneForOneStreamManager might leak memory in connectionTerminated No new revisions were added by this update. Summary of changes: .../network/server/OneForOneStreamManager.java | 24 ++--- .../server/OneForOneStreamManagerSuite.java| 39 ++ 2 files changed, 58 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e751bc6 -> 6c178a5)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e751bc6 [SPARK-30479][SQL] Apply compaction of event log to SQL events add 6c178a5 [SPARK-30495][SS] Consider spark.security.credentials.kafka.enabled and cluster configuration when checking latest delegation token No new revisions were added by this update. Summary of changes: .../security/HadoopDelegationTokenManager.scala| 61 ++- .../sql/kafka010/consumer/KafkaDataConsumer.scala | 2 +- .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 14 +++-- .../spark/kafka010/KafkaTokenUtilSuite.scala | 69 +- 4 files changed, 83 insertions(+), 63 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (990a2be -> e751bc6)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 990a2be [SPARK-30378][ML][PYSPARK][FOLLOWUP] Remove Param fields provided by _FactorizationMachinesParams add e751bc6 [SPARK-30479][SQL] Apply compaction of event log to SQL events No new revisions were added by this update. Summary of changes: .../spark/status/ListenerEventsTestHelper.scala| 47 +++ apache.spark.deploy.history.EventFilterBuilder | 1 + .../execution/history/SQLEventFilterBuilder.scala | 147 + .../history/SQLEventFilterBuilderSuite.scala | 107 +++ .../history/SQLLiveEntitiesEventFilterSuite.scala | 135 +++ 5 files changed, 437 insertions(+) create mode 100644 sql/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (990a2be -> e751bc6)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 990a2be [SPARK-30378][ML][PYSPARK][FOLLOWUP] Remove Param fields provided by _FactorizationMachinesParams add e751bc6 [SPARK-30479][SQL] Apply compaction of event log to SQL events No new revisions were added by this update. Summary of changes: .../spark/status/ListenerEventsTestHelper.scala| 47 +++ apache.spark.deploy.history.EventFilterBuilder | 1 + .../execution/history/SQLEventFilterBuilder.scala | 147 + .../history/SQLEventFilterBuilderSuite.scala | 107 +++ .../history/SQLLiveEntitiesEventFilterSuite.scala | 135 +++ 5 files changed, 437 insertions(+) create mode 100644 sql/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilder.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9320011 -> 0c6bd3b)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9320011 [SPARK-9478][ML][PYSPARK] Add sample weights to Random Forest add 0c6bd3b [SPARK-27142][SQL] Provide REST API for SQL information No new revisions were added by this update. Summary of changes: .../spark/sql/execution/ui/SQLAppStatusStore.scala | 4 + .../status/api/v1/sql/ApiSqlRootResource.scala | 16 ++-- .../spark/status/api/v1/sql/SqlResource.scala | 101 + .../org/apache/spark/status/api/v1/sql/api.scala | 20 +++- 4 files changed, 127 insertions(+), 14 deletions(-) copy common/unsafe/src/main/java/org/apache/spark/unsafe/KVIterator.java => sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala (73%) create mode 100644 sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala copy core/src/main/scala/org/apache/spark/metrics/sink/package.scala => sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala (65%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9320011 -> 0c6bd3b)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9320011 [SPARK-9478][ML][PYSPARK] Add sample weights to Random Forest add 0c6bd3b [SPARK-27142][SQL] Provide REST API for SQL information No new revisions were added by this update. Summary of changes: .../spark/sql/execution/ui/SQLAppStatusStore.scala | 4 + .../status/api/v1/sql/ApiSqlRootResource.scala | 16 ++-- .../spark/status/api/v1/sql/SqlResource.scala | 101 + .../org/apache/spark/status/api/v1/sql/api.scala | 20 +++- 4 files changed, 127 insertions(+), 14 deletions(-) copy common/unsafe/src/main/java/org/apache/spark/unsafe/KVIterator.java => sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala (73%) create mode 100644 sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala copy core/src/main/scala/org/apache/spark/metrics/sink/package.scala => sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala (65%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2bd8731 -> 7fb17f59)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2bd8731 [SPARK-30468][SQL] Use multiple lines to display data columns for show create table command add 7fb17f59 [SPARK-29779][CORE] Compact old event log files and cleanup No new revisions were added by this update. Summary of changes: apache.spark.deploy.history.EventFilterBuilder | 1 + .../deploy/history/BasicEventFilterBuilder.scala | 176 +++ .../apache/spark/deploy/history/EventFilter.scala | 109 +++ .../deploy/history/EventLogFileCompactor.scala | 224 ++ .../spark/deploy/history/EventLogFileReaders.scala | 28 +- .../spark/deploy/history/EventLogFileWriters.scala | 28 +- .../org/apache/spark/internal/config/package.scala | 18 ++ .../history/BasicEventFilterBuilderSuite.scala | 228 ++ .../deploy/history/BasicEventFilterSuite.scala | 208 + .../history/EventLogFileCompactorSuite.scala | 326 + .../deploy/history/EventLogFileReadersSuite.scala | 6 +- .../deploy/history/EventLogFileWritersSuite.scala | 4 +- .../spark/deploy/history/EventLogTestHelper.scala | 55 +++- .../spark/status/AppStatusListenerSuite.scala | 38 +-- .../spark/status/ListenerEventsTestHelper.scala| 154 ++ 15 files changed, 1545 insertions(+), 58 deletions(-) create mode 100644 core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0a72dba -> bd7510b)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0a72dba [SPARK-30445][CORE] Accelerator aware scheduling handle setting configs to 0 add bd7510b [SPARK-30281][SS] Consider partitioned/recursive option while verifying archive path on FileStreamSource No new revisions were added by this update. Summary of changes: docs/structured-streaming-programming-guide.md | 3 +- .../sql/execution/streaming/FileStreamSource.scala | 73 +- .../sql/streaming/FileStreamSourceSuite.scala | 26 ++-- 3 files changed, 80 insertions(+), 22 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (604d679 -> 895e572)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 604d679 [SPARK-30226][SQL] Remove withXXX functions in WriteBuilder add 895e572 [SPARK-30313][CORE] Ensure EndpointRef is available MasterWebUI/WorkerPage No new revisions were added by this update. Summary of changes: .../org/apache/spark/rpc/netty/Dispatcher.scala| 35 ++ 1 file changed, 23 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 16f8fae [SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError 16f8fae is described below commit 16f8fae01f329d4ba5786176c3c8dc4e648a8c22 Author: Wang Shuo AuthorDate: Thu Jan 2 16:40:22 2020 -0800 [SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError There is a deadlock between `LiveListenerBus#stop` and `AsyncEventQueue#removeListenerOnError`. We can reproduce as follows: 1. Post some events to `LiveListenerBus` 2. Call `LiveListenerBus#stop` and hold the synchronized lock of `bus`(https://github.com/apache/spark/blob/5e92301723464d0876b5a7eec59c15fed0c5b98c/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L229), waiting until all the events are processed by listeners, then remove all the queues 3. Event queue would drain out events by posting to its listeners. If a listener is interrupted, it will call `AsyncEventQueue#removeListenerOnError`, inside it will call `bus.removeListener`(https://github.com/apache/spark/blob/7b1b60c7583faca70aeab2659f06d4e491efa5c0/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala#L207), trying to acquire synchronized lock of bus, resulting in deadlock This PR removes the `synchronized` from `LiveListenerBus.stop` because underlying data structures themselves are thread-safe. To fix deadlock. No. New UT. Closes #26924 from wangshuo128/event-queue-race-condition. Authored-by: Wang Shuo Signed-off-by: Marcelo Vanzin (cherry picked from commit 10cae04108c375a7f5ca7685fea593bd7f49f7a6) Signed-off-by: Marcelo Vanzin --- .../apache/spark/scheduler/LiveListenerBus.scala | 6 +- .../spark/scheduler/SparkListenerSuite.scala | 70 ++ 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index d135190..1f42f09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -215,10 +215,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) { return } -synchronized { - queues.asScala.foreach(_.stop()) - queues.clear() -} +queues.asScala.foreach(_.stop()) +queues.clear() } // For testing only. diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6ffd1e8..0b843be 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -531,6 +531,47 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } } + Seq(true, false).foreach { throwInterruptedException => +val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted" +test(s"SPARK-30285: Fix deadlock in AsyncEventQueue.removeListenerOnError: $suffix") { + val LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS = 10 * 1000L // 10 seconds + val bus = new LiveListenerBus(new SparkConf(false)) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val interruptingListener = new DelayInterruptingJobCounter(throwInterruptedException, 3) + bus.addToSharedQueue(counter1) + bus.addToSharedQueue(interruptingListener) + bus.addToEventLogQueue(counter2) + assert(bus.activeQueues() === Set(SHARED_QUEUE, EVENT_LOG_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + assert(bus.findListenersByClass[DelayInterruptingJobCounter]().size === 1) + + bus.start(mockSparkContext, mockMetricsSystem) + + (0 until 5).foreach { jobId => +bus.post(SparkListenerJobEnd(jobId, jobCompletionTime, JobSucceeded)) + } + + // Call bus.stop in a separate thread, otherwise we will block here until bus is stopped + val stoppingThread = new Thread(new Runnable() { +override def run(): Unit = bus.stop() + }) + stoppingThread.start() + // Notify interrupting listener starts to work + interruptingListener.sleep = false + // Wait for bus to stop + stoppingThread.join(LISTENER_BUS_STOP_WAITING_TIMEOUT_MILLIS) + + // Stopping has been finished + assert(stoppingThread.isAlive === false) + // All queues are removed + assert(bus.activeQ
[spark] branch master updated (1b0570c -> 10cae041)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1b0570c [SPARK-30387] Improving stop hook log message add 10cae041 [SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError No new revisions were added by this update. Summary of changes: .../apache/spark/scheduler/LiveListenerBus.scala | 6 +- .../spark/scheduler/SparkListenerSuite.scala | 70 ++ 2 files changed, 72 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1b0570c -> 10cae041)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1b0570c [SPARK-30387] Improving stop hook log message add 10cae041 [SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError No new revisions were added by this update. Summary of changes: .../apache/spark/scheduler/LiveListenerBus.scala | 6 +- .../spark/scheduler/SparkListenerSuite.scala | 70 ++ 2 files changed, 72 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c6ab716 -> 7bff2db)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c6ab716 [SPARK-29224][ML] Implement Factorization Machines as a ml-pipeline component add 7bff2db [SPARK-21869][SS] Revise Kafka producer pool to implement 'expire' correctly No new revisions were added by this update. Summary of changes: .../spark/sql/kafka010/CachedKafkaProducer.scala | 128 - .../spark/sql/kafka010/KafkaDataWriter.scala | 23 +-- .../apache/spark/sql/kafka010/KafkaWriteTask.scala | 20 +- .../org/apache/spark/sql/kafka010/package.scala| 7 + .../kafka010/producer/CachedKafkaProducer.scala| 27 ++- .../producer/InternalKafkaProducerPool.scala | 206 + .../sql/kafka010/CachedKafkaProducerSuite.scala| 77 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 2 +- .../org/apache/spark/sql/kafka010/KafkaTest.scala | 3 +- .../producer/InternalKafkaProducerPoolSuite.scala | 192 +++ 10 files changed, 449 insertions(+), 236 deletions(-) delete mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala copy sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala => external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala (58%) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala delete mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPoolSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c6ab716 -> 7bff2db)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c6ab716 [SPARK-29224][ML] Implement Factorization Machines as a ml-pipeline component add 7bff2db [SPARK-21869][SS] Revise Kafka producer pool to implement 'expire' correctly No new revisions were added by this update. Summary of changes: .../spark/sql/kafka010/CachedKafkaProducer.scala | 128 - .../spark/sql/kafka010/KafkaDataWriter.scala | 23 +-- .../apache/spark/sql/kafka010/KafkaWriteTask.scala | 20 +- .../org/apache/spark/sql/kafka010/package.scala| 7 + .../kafka010/producer/CachedKafkaProducer.scala| 27 ++- .../producer/InternalKafkaProducerPool.scala | 206 + .../sql/kafka010/CachedKafkaProducerSuite.scala| 77 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 2 +- .../org/apache/spark/sql/kafka010/KafkaTest.scala | 3 +- .../producer/InternalKafkaProducerPoolSuite.scala | 192 +++ 10 files changed, 449 insertions(+), 236 deletions(-) delete mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala copy sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala => external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/CachedKafkaProducer.scala (58%) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPool.scala delete mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/producer/InternalKafkaProducerPoolSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-17398][SQL] Fix ClassCastException when querying partitioned JSON table
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 07caebf [SPARK-17398][SQL] Fix ClassCastException when querying partitioned JSON table 07caebf is described below commit 07caebf2194fc34f1b4aacf2aa6c2d6961587482 Author: Wing Yew Poon AuthorDate: Fri Dec 20 10:39:26 2019 -0800 [SPARK-17398][SQL] Fix ClassCastException when querying partitioned JSON table When querying a partitioned table with format `org.apache.hive.hcatalog.data.JsonSerDe` and more than one task runs in each executor concurrently, the following exception is encountered: `java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.apache.hive.hcatalog.data.HCatRecord` The exception occurs in `HadoopTableReader.fillObject`. `org.apache.hive.hcatalog.data.JsonSerDe#initialize` populates a `cachedObjectInspector` field by calling `HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector`, which is not thread-safe; this `cachedObjectInspector` is returned by `JsonSerDe#getObjectInspector`. We protect against this Hive bug by synchronizing on an object when we need to call `initialize` on `org.apache.hadoop.hive.serde2.Deserializer` instances (which may be `JsonSerDe` instances). By doing so, the `ObjectInspector` for the `Deserializer` of the partitions of the JSON table and that of the table `SerDe` are the same cached `ObjectInspector` and `HadoopTableReader.fillObject` then works correctly. (If the `ObjectInspector`s are different, then a bug in `HCatRecordObjectInsp [...] To avoid HIVE-15773 / HIVE-21752. No. Tested manually on a cluster with a partitioned JSON table and running a query using more than one core per executor. Before this change, the ClassCastException happens consistently. With this change it does not happen. Closes #26895 from wypoon/SPARK-17398. Authored-by: Wing Yew Poon Signed-off-by: Marcelo Vanzin (cherry picked from commit c72f88b0ba20727e831ba9755d9628d0347ee3cb) Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/sql/hive/TableReader.scala| 23 +++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 7d57389..6631073 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -133,7 +133,9 @@ class HadoopTableReader( val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, localTableDesc.getProperties) + DeserializerLock.synchronized { +deserializer.initialize(hconf, localTableDesc.getProperties) + } HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) } @@ -255,10 +257,14 @@ class HadoopTableReader( partProps.asScala.foreach { case (key, value) => props.setProperty(key, value) } -deserializer.initialize(hconf, props) +DeserializerLock.synchronized { + deserializer.initialize(hconf, props) +} // get the table deserializer val tableSerDe = localTableDesc.getDeserializerClass.newInstance() -tableSerDe.initialize(hconf, localTableDesc.getProperties) +DeserializerLock.synchronized { + tableSerDe.initialize(hconf, localTableDesc.getProperties) +} // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, @@ -337,6 +343,17 @@ private[hive] object HiveTableUtil { } } +/** + * Object to synchronize on when calling org.apache.hadoop.hive.serde2.Deserializer#initialize. + * + * [SPARK-17398] org.apache.hive.hcatalog.data.JsonSerDe#initialize calls the non-thread-safe + * HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector, the results of which are + * returned by JsonSerDe#getObjectInspector. + * To protect against this bug in Hive (HIVE-15773/HIVE-21752), we synchronize on this object + * when calling initialize on Deserializer instances that could be JsonSerDe instances. + */ +private[hive] object DeserializerLock + private[hive] object HadoopTableReader extends HiveInspectors with Logging { /** * Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands
[spark] branch master updated (7dff3b1 -> c72f88b)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7dff3b1 [SPARK-30272][SQL][CORE] Remove usage of Guava that breaks in 27; replace with workalikes add c72f88b [SPARK-17398][SQL] Fix ClassCastException when querying partitioned JSON table No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/hive/TableReader.scala| 23 +++--- 1 file changed, 20 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7dff3b1 -> c72f88b)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 7dff3b1 [SPARK-30272][SQL][CORE] Remove usage of Guava that breaks in 27; replace with workalikes add c72f88b [SPARK-17398][SQL] Fix ClassCastException when querying partitioned JSON table No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/hive/TableReader.scala| 23 +++--- 1 file changed, 20 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (297f406 -> cdc8fc6)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 297f406 [SPARK-29600][SQL] ArrayContains function may return incorrect result for DecimalType add cdc8fc6 [SPARK-30235][CORE] Switching off host local disk reading of shuffle blocks in case of useOldFetchProtocol No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/internal/config/package.scala | 16 .../scala/org/apache/spark/storage/BlockManager.scala| 3 ++- docs/core-migration-guide.md | 2 ++ docs/sql-migration-guide.md | 2 -- 4 files changed, 12 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-25392][CORE][WEBUI] Prevent error page when accessing pools page from history server
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 6d90298 [SPARK-25392][CORE][WEBUI] Prevent error page when accessing pools page from history server 6d90298 is described below commit 6d90298438e627187088a5d8c53d470646d051f4 Author: shahid AuthorDate: Mon Dec 16 15:02:34 2019 -0800 [SPARK-25392][CORE][WEBUI] Prevent error page when accessing pools page from history server ### What changes were proposed in this pull request? ### Why are the changes needed? Currently from history server, we will not able to access the pool info, as we aren't writing pool information to the event log other than pool name. Already spark is hiding pool table when accessing from history server. But from the pool column in the stage table will redirect to the pools table, and that will throw error when accessing the pools page. To prevent error page, we need to hide the pool column also in the stage table ### Does this PR introduce any user-facing change? No ### How was this patch tested? Manual test Before change: ![Screenshot 2019-11-21 at 6 49 40 AM](https://user-images.githubusercontent.com/23054875/69293868-219b2280-0c30-11ea-9b9a-17140d024d3a.png) ![Screenshot 2019-11-21 at 6 48 51 AM](https://user-images.githubusercontent.com/23054875/69293834-147e3380-0c30-11ea-9dec-d5f67665486d.png) After change: ![Screenshot 2019-11-21 at 7 29 01 AM](https://user-images.githubusercontent.com/23054875/69293991-9cfcd400-0c30-11ea-98a0-7a6268a4e5ab.png) Closes #26616 from shahidki31/poolHistory. Authored-by: shahid Signed-off-by: Marcelo Vanzin (cherry picked from commit dd217e10fc0408831c2c658fc3f52d2917f1a6a2) Signed-off-by: Marcelo Vanzin --- core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala | 3 +-- core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala | 2 ++ core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index f672ce0..d8a93ad 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -30,7 +30,6 @@ import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { private val sc = parent.sc private val subPath = "stages" - private def isFairScheduler = parent.isFairScheduler def render(request: HttpServletRequest): Seq[Node] = { // For now, pool information is only accessible in live UIs @@ -57,7 +56,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { -val poolsDescription = if (sc.isDefined && isFairScheduler) { +val poolsDescription = if (parent.isFairScheduler) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index ff1b75e..102ec4d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -33,7 +33,9 @@ private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore) val sc = parent.sc val killEnabled = parent.killEnabled + // Show pool information for only live UI. def isFairScheduler: Boolean = { +sc.isDefined && store .environmentInfo() .sparkProperties diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 10b0320..b7a8c56 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -36,7 +36,9 @@ private[ui] class StagesTab(val parent: SparkUI, val store: AppStatusStore) attachPage(new StagePage(this, store)) attachPage(new PoolPage(this)) + // Show pool information for only live UI. def isFairScheduler: Boolean = { +sc.isDefined && store .environmentInfo() .sparkProperties - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5954311 -> dd217e1)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 5954311 [SPARK-29043][CORE] Improve the concurrent performance of History Server add dd217e1 [SPARK-25392][CORE][WEBUI] Prevent error page when accessing pools page from history server No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala | 3 +-- core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala | 2 ++ core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (dddfeca -> 5954311)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from dddfeca [SPARK-30209][SQL][WEB-UI] Display stageId, attemptId and taskId for max metrics in Spark UI add 5954311 [SPARK-29043][CORE] Improve the concurrent performance of History Server No new revisions were added by this update. Summary of changes: .../spark/deploy/history/FsHistoryProvider.scala | 121 ++--- .../deploy/history/FsHistoryProviderSuite.scala| 39 ++- 2 files changed, 118 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (23b1312 -> b573f23)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 23b1312 [SPARK-30200][DOCS][FOLLOW-UP] Add documentation for explain(mode: String) add b573f23 [SPARK-29574][K8S] Add SPARK_DIST_CLASSPATH to the executor class path No new revisions were added by this update. Summary of changes: docs/hadoop-provided.md| 22 ++ .../src/main/dockerfiles/spark/entrypoint.sh | 8 +++- 2 files changed, 29 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (23b1312 -> b573f23)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 23b1312 [SPARK-30200][DOCS][FOLLOW-UP] Add documentation for explain(mode: String) add b573f23 [SPARK-29574][K8S] Add SPARK_DIST_CLASSPATH to the executor class path No new revisions were added by this update. Summary of changes: docs/hadoop-provided.md| 22 ++ .../src/main/dockerfiles/spark/entrypoint.sh | 8 +++- 2 files changed, 29 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (ec26dde3 -> 61ebc81)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from ec26dde3 [SPARK-29455][WEBUI] Improve tooltip information for Stages add 61ebc81 [SPARK-30167][REPL] Log4j configuration for REPL can't override the root logger properly No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/internal/Logging.scala | 28 ++--- .../org/apache/spark/internal/LoggingSuite.scala | 12 +-- .../scala/org/apache/spark/repl/ReplSuite.scala| 114 - 3 files changed, 133 insertions(+), 21 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (cfd7ca9 -> d7843dd)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from cfd7ca9 Revert "[SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer" add d7843dd [SPARK-29152][CORE] Executor Plugin shutdown when dynamic allocation is enabled No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/executor/Executor.scala | 47 +- 1 file changed, 27 insertions(+), 20 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (cfd7ca9 -> d7843dd)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from cfd7ca9 Revert "[SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer" add d7843dd [SPARK-29152][CORE] Executor Plugin shutdown when dynamic allocation is enabled No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/executor/Executor.scala | 47 +- 1 file changed, 27 insertions(+), 20 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e23c135 -> fd2bf55)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e23c135 [SPARK-29293][BUILD] Move scalafmt to Scala 2.12 profile; bump to 0.12 add fd2bf55 [SPARK-27651][CORE] Avoid the network when shuffle blocks are fetched from the same host No new revisions were added by this update. Summary of changes: .../network/shuffle/ExternalBlockHandler.java | 6 + .../network/shuffle/ExternalBlockStoreClient.java | 49 +++- .../shuffle/ExternalShuffleBlockResolver.java | 15 ++ .../shuffle/protocol/BlockTransferMessage.java | 4 +- .../network/shuffle/protocol/BlocksRemoved.java| 2 +- .../shuffle/protocol/ExecutorShuffleInfo.java | 2 +- ...veBlocks.java => GetLocalDirsForExecutors.java} | 46 ++-- .../shuffle/protocol/LocalDirsForExecutors.java| 117 ++ .../shuffle/BlockTransferMessagesSuite.java| 24 +- .../main/scala/org/apache/spark/SparkContext.scala | 8 + .../org/apache/spark/internal/config/package.scala | 18 ++ .../apache/spark/network/BlockDataManager.scala| 12 +- .../spark/network/netty/NettyBlockRpcServer.scala | 6 +- .../spark/shuffle/IndexShuffleBlockResolver.scala | 41 +++- .../spark/shuffle/ShuffleBlockResolver.scala | 10 +- .../org/apache/spark/storage/BlockManager.scala| 70 +- .../spark/storage/BlockManagerMasterEndpoint.scala | 26 ++- .../storage/ShuffleBlockFetcherIterator.scala | 248 +++-- .../apache/spark/ExternalShuffleServiceSuite.scala | 49 +++- .../netty/NettyBlockTransferSecuritySuite.scala| 2 +- .../shuffle/BlockStoreShuffleReaderSuite.scala | 3 +- .../spark/storage/BlockManagerInfoSuite.scala | 1 - .../apache/spark/storage/BlockManagerSuite.scala | 4 +- .../storage/ShuffleBlockFetcherIteratorSuite.scala | 220 +- 24 files changed, 794 insertions(+), 189 deletions(-) copy common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/{RemoveBlocks.java => GetLocalDirsForExecutors.java} (57%) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (6880ccd -> 94ddc2a)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from 6880ccd [MINOR][INFRA] Use GitHub Action Cache for `build` add 94ddc2a [SPARK-29971][CORE][2.4] Fix buffer leaks in `TransportFrameDecoder/TransportCipher` No new revisions were added by this update. Summary of changes: .../spark/network/crypto/TransportCipher.java | 50 +++- .../network/util/ByteArrayReadableChannel.java | 24 +++--- .../spark/network/util/TransportFrameDecoder.java | 18 - .../spark/network/crypto/TransportCipherSuite.java | 91 ++ 4 files changed, 148 insertions(+), 35 deletions(-) create mode 100644 common/network-common/src/test/java/org/apache/spark/network/crypto/TransportCipherSuite.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (29ebd93 -> bec2068)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 29ebd93 [SPARK-29979][SQL] Add basic/reserved property key constants in TableCatalog and SupportsNamespaces add bec2068 [SPARK-26260][CORE] For disk store tasks summary table should show only successful tasks summary No new revisions were added by this update. Summary of changes: .../org/apache/spark/status/AppStatusStore.scala | 82 +-- .../scala/org/apache/spark/status/LiveEntity.scala | 102 ++- .../scala/org/apache/spark/status/storeTypes.scala | 76 -- .../apache/spark/status/AppStatusStoreSuite.scala | 112 - 4 files changed, 234 insertions(+), 138 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6b0e391 -> f28eab2)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6b0e391 [SPARK-29427][SQL] Add API to convert RelationalGroupedDataset to KeyValueGroupedDataset add f28eab2 [SPARK-29971][CORE] Fix buffer leaks in `TransportFrameDecoder/TransportCipher` No new revisions were added by this update. Summary of changes: .../spark/network/crypto/TransportCipher.java | 50 +++- .../network/util/ByteArrayReadableChannel.java | 24 +++--- .../spark/network/util/TransportFrameDecoder.java | 28 +-- .../spark/network/crypto/TransportCipherSuite.java | 91 ++ 4 files changed, 157 insertions(+), 36 deletions(-) create mode 100644 common/network-common/src/test/java/org/apache/spark/network/crypto/TransportCipherSuite.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6b0e391 -> f28eab2)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6b0e391 [SPARK-29427][SQL] Add API to convert RelationalGroupedDataset to KeyValueGroupedDataset add f28eab2 [SPARK-29971][CORE] Fix buffer leaks in `TransportFrameDecoder/TransportCipher` No new revisions were added by this update. Summary of changes: .../spark/network/crypto/TransportCipher.java | 50 +++- .../network/util/ByteArrayReadableChannel.java | 24 +++--- .../spark/network/util/TransportFrameDecoder.java | 28 +-- .../spark/network/crypto/TransportCipherSuite.java | 91 ++ 4 files changed, 157 insertions(+), 36 deletions(-) create mode 100644 common/network-common/src/test/java/org/apache/spark/network/crypto/TransportCipherSuite.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (848bdfa -> c0507e0)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 848bdfa [SPARK-29829][SQL] SHOW TABLE EXTENDED should do multi-catalog resolution add c0507e0 [SPARK-29833][YARN] Add FileNotFoundException check for spark.yarn.jars No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +- .../scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 14 +- 2 files changed, 18 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c941362 -> df08e90)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c941362 [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows add df08e90 [SPARK-29755][CORE] Provide @JsonDeserialize for Option[Long] in LogInfo & AttemptInfoWrapper No new revisions were added by this update. Summary of changes: .../spark/deploy/history/FsHistoryProvider.scala | 4 ++ .../deploy/history/FsHistoryProviderSuite.scala| 50 ++ 2 files changed, 54 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9753a8e -> c941362)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9753a8e [SPARK-29766][SQL] Do metrics aggregation asynchronously in SQL listener add c941362 [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows No new revisions were added by this update. Summary of changes: docs/ss-migration-guide.md | 1 + .../org/apache/spark/sql/internal/SQLConf.scala| 10 + .../spark/sql/execution/SparkStrategies.scala | 5 +- .../spark/sql/execution/streaming/OffsetSeq.scala | 9 +- .../streaming/StreamingSymmetricHashJoinExec.scala | 84 +-- .../state/SymmetricHashJoinStateManager.scala | 251 + .../commits/0 | 0 .../metadata | 1 + .../offsets/0 | 4 + .../state/0/0/left-keyToNumValues}/1.delta | Bin .../state/0/0/left-keyWithIndexToValue}/1.delta| Bin .../state/0/0/right-keyToNumValues}/1.delta| Bin .../state/0/0/right-keyWithIndexToValue}/1.delta | Bin .../state/0/1/left-keyToNumValues/1.delta | Bin 0 -> 86 bytes .../state/0/1/left-keyWithIndexToValue/1.delta | Bin 0 -> 92 bytes .../state/0/1/right-keyToNumValues}/1.delta| Bin .../state/0/1/right-keyWithIndexToValue}/1.delta | Bin .../state/0/2/left-keyToNumValues/1.delta | Bin 0 -> 70 bytes .../state/0/2/left-keyWithIndexToValue/1.delta | Bin 0 -> 72 bytes .../state/0/2/right-keyToNumValues/1.delta | Bin 0 -> 70 bytes .../state/0/2/right-keyWithIndexToValue/1.delta| Bin 0 -> 72 bytes .../state/0/3/left-keyToNumValues/1.delta | Bin 0 -> 64 bytes .../state/0/3/left-keyWithIndexToValue/1.delta | Bin 0 -> 73 bytes .../state/0/3/right-keyToNumValues}/1.delta| Bin .../state/0/3/right-keyWithIndexToValue}/1.delta | Bin .../state/0/4/left-keyToNumValues/1.delta | Bin 0 -> 70 bytes .../state/0/4/left-keyWithIndexToValue/1.delta | Bin 0 -> 73 bytes .../state/0/4/right-keyToNumValues/1.delta | Bin 0 -> 70 bytes .../state/0/4/right-keyWithIndexToValue/1.delta| Bin 0 -> 73 bytes .../state/SymmetricHashJoinStateManagerSuite.scala | 18 +- .../spark/sql/streaming/StreamingJoinSuite.scala | 225 +- 31 files changed, 526 insertions(+), 82 deletions(-) copy sql/core/src/test/resources/structured-streaming/{escaped-path-2.4.0/chk%252520%252525@%252523chk => checkpoint-version-2.4.0-streaming-join}/commits/0 (100%) create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.0-streaming-join/metadata create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.0-streaming-join/offsets/0 copy sql/core/src/test/resources/structured-streaming/{checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4 => checkpoint-version-2.4.0-streaming-join/state/0/0/left-keyToNumValues}/1.delta (100%) copy sql/core/src/test/resources/structured-streaming/{checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4 => checkpoint-version-2.4.0-streaming-join/state/0/0/left-keyWithIndexToValue}/1.delta (100%) copy sql/core/src/test/resources/structured-streaming/{checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4 => checkpoint-version-2.4.0-streaming-join/state/0/0/right-keyToNumValues}/1.delta (100%) copy sql/core/src/test/resources/structured-streaming/{checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4 => checkpoint-version-2.4.0-streaming-join/state/0/0/right-keyWithIndexToValue}/1.delta (100%) create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.0-streaming-join/state/0/1/left-keyToNumValues/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.0-streaming-join/state/0/1/left-keyWithIndexToValue/1.delta copy sql/core/src/test/resources/structured-streaming/{checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4 => checkpoint-version-2.4.0-streaming-join/state/0/1/right-keyToNumValues}/1.delta (100%) copy sql/core/src/test/resources/structured-streaming/{checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4 => checkpoint-version-2.4.0-streaming-join/state/0/1/right-keyWithIndexToValue}/1.delta (100%) create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.0-streaming-join/state/0/2/left-keyToNumValues/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.0-streaming-join/state/0/2/left-keyWithIndexToValue/1.delta create mode 100644 sql/core/src/test/resources/structured-streaming/checkpoint-version-2.4.0-stre
[spark] branch branch-2.4 updated: [SPARK-29790][DOC] Note required port for Kube API
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new bef7c0f [SPARK-29790][DOC] Note required port for Kube API bef7c0f is described below commit bef7c0fddbddf1cc9d22b3ea60153b7e1bf8809d Author: Emil Sandstø AuthorDate: Fri Nov 8 09:32:29 2019 -0800 [SPARK-29790][DOC] Note required port for Kube API It adds a note about the required port of a master url in Kubernetes. Currently a port needs to be specified for the Kubernetes API. Also in case the API is hosted on the HTTPS port. Else the driver might fail with https://medium.com/kidane.weldemariam_75349/thanks-james-on-issuing-spark-submit-i-run-into-this-error-cc507d4f8f0d Yes, a change to the "Running on Kubernetes" guide. None - Documentation change Closes #26426 from Tapped/patch-1. Authored-by: Emil Sandstø Signed-off-by: Marcelo Vanzin (cherry picked from commit 0bdadba5e3810f8e3f5da13e2a598071cbadab94) Signed-off-by: Marcelo Vanzin --- docs/running-on-kubernetes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 0277043..037e1d5 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -103,7 +103,7 @@ $ bin/spark-submit \ ``` The Spark master, specified either via passing the `--master` command line argument to `spark-submit` or by setting -`spark.master` in the application's configuration, must be a URL with the format `k8s://`. Prefixing the +`spark.master` in the application's configuration, must be a URL with the format `k8s://:`. The port must always be specified, even if it's the HTTPS port 443. Prefixing the master string with `k8s://` will cause the Spark application to launch on the Kubernetes cluster, with the API server being contacted at `api_server_url`. If no HTTP protocol is specified in the URL, it defaults to `https`. For example, setting the master to `k8s://example.com:443` is equivalent to setting it to `k8s://https://example.com:443`, but to - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e026412 -> 0bdadba)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e026412 [SPARK-29679][SQL] Make interval type comparable and orderable add 0bdadba [SPARK-29790][DOC] Note required port for Kube API No new revisions were added by this update. Summary of changes: docs/running-on-kubernetes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e026412 -> 0bdadba)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e026412 [SPARK-29679][SQL] Make interval type comparable and orderable add 0bdadba [SPARK-29790][DOC] Note required port for Kube API No new revisions were added by this update. Summary of changes: docs/running-on-kubernetes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4ec04e5 -> 3641c3d)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4ec04e5 [SPARK-22340][PYTHON] Add a mode to pin Python thread into JVM's add 3641c3d [SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer No new revisions were added by this update. Summary of changes: .../spark/sql/kafka010/CachedKafkaProducer.scala | 118 +--- .../sql/kafka010/InternalKafkaConnectorPool.scala | 210 + .../sql/kafka010/InternalKafkaConsumerPool.scala | 210 +++-- .../sql/kafka010/InternalKafkaProducerPool.scala | 68 +++ .../spark/sql/kafka010/KafkaDataConsumer.scala | 7 +- .../spark/sql/kafka010/KafkaDataWriter.scala | 34 +++- .../apache/spark/sql/kafka010/KafkaWriteTask.scala | 20 +- .../org/apache/spark/sql/kafka010/package.scala| 34 +++- .../sql/kafka010/CachedKafkaProducerSuite.scala| 154 +++ ...scala => InternalKafkaConnectorPoolSuite.scala} | 8 +- .../sql/kafka010/KafkaDataConsumerSuite.scala | 6 +- .../org/apache/spark/sql/kafka010/KafkaTest.scala | 10 +- .../kafka010/KafkaDataConsumerSuite.scala | 7 - 13 files changed, 554 insertions(+), 332 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConnectorPool.scala create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaProducerPool.scala rename external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/{InternalKafkaConsumerPoolSuite.scala => InternalKafkaConnectorPoolSuite.scala} (96%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-29635][SS] Extract base test suites between Kafka micro-batch sink and Kafka continuous sink
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 252ecd3 [SPARK-29635][SS] Extract base test suites between Kafka micro-batch sink and Kafka continuous sink 252ecd3 is described below commit 252ecd333ff7fa65c50e72fec25e7f5ee66bc9e7 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Wed Nov 6 17:08:42 2019 -0800 [SPARK-29635][SS] Extract base test suites between Kafka micro-batch sink and Kafka continuous sink ### What changes were proposed in this pull request? This patch leverages V2 continuous memory stream to extract tests from Kafka micro-batch sink suite and continuous sink suite and deduplicate them. These tests are basically doing the same, except how to run and verify the result. ### Why are the changes needed? We no longer have same tests spotted on two places - brings 300 lines deletion. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UTs. Closes #26292 from HeartSaVioR/SPARK-29635. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin --- .../main/scala/org/apache/spark/TestUtils.scala| 16 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala| 395 - .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 265 -- 3 files changed, 177 insertions(+), 499 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 5d79394..1f06364 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -24,7 +24,7 @@ import java.nio.file.{Files => JavaFiles} import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE} import java.security.SecureRandom import java.security.cert.X509Certificate -import java.util.{Arrays, EnumSet, Properties} +import java.util.{Arrays, EnumSet, Locale, Properties} import java.util.concurrent.{TimeoutException, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream, Manifest} import javax.net.ssl._ @@ -214,12 +214,20 @@ private[spark] object TestUtils { * Asserts that exception message contains the message. Please note this checks all * exceptions in the tree. */ - def assertExceptionMsg(exception: Throwable, msg: String): Unit = { + def assertExceptionMsg(exception: Throwable, msg: String, ignoreCase: Boolean = false): Unit = { +def contain(msg1: String, msg2: String): Boolean = { + if (ignoreCase) { +msg1.toLowerCase(Locale.ROOT).contains(msg2.toLowerCase(Locale.ROOT)) + } else { +msg1.contains(msg2) + } +} + var e = exception -var contains = e.getMessage.contains(msg) +var contains = contain(e.getMessage, msg) while (e.getCause != null && !contains) { e = e.getCause - contains = e.getMessage.contains(msg) + contains = contain(e.getMessage, msg) } assert(contains, s"Exception tree doesn't contain the expected message: $msg") } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala deleted file mode 100644 index 031f609..000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.util.Locale - -import scala.reflect.ClassTag - -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
[spark] branch master updated (4110153 -> 782992c)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4110153 [SPARK-29752][SQL][TEST] make AdaptiveQueryExecSuite more robust add 782992c [SPARK-29642][SS] Change the element type of underlying array to UnsafeRow for ContinuousRecordEndpoint No new revisions were added by this update. Summary of changes: .../execution/streaming/ContinuousRecordEndpoint.scala | 6 +++--- .../continuous/ContinuousTextSocketSource.scala | 15 +++ .../streaming/sources/ContinuousMemoryStream.scala | 6 -- .../streaming/sources/TextSocketStreamSuite.scala | 7 +-- .../sql/streaming/continuous/ContinuousSuite.scala | 17 + 5 files changed, 40 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e5c176a -> 4615769)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e5c176a [MINOR][INFRA] Change the Github Actions build command to `mvn install` add 4615769 [SPARK-29603][YARN] Support application priority for YARN priority scheduling No new revisions were added by this update. Summary of changes: docs/running-on-yarn.md | 9 + .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 .../src/main/scala/org/apache/spark/deploy/yarn/config.scala | 7 +++ .../test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala| 2 ++ 4 files changed, 22 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4c53ac1 -> 075cd55)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4c53ac1 [SPARK-29387][SQL] Support `*` and `/` operators for intervals add 075cd55 [SPARK-29763] Fix Stage UI Page not showing all accumulators in Task Table No new revisions were added by this update. Summary of changes: core/src/main/resources/org/apache/spark/ui/static/stagepage.js | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (04536b2 -> ba2bc4b)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 04536b2 [SPARK-28552][SQL] Case-insensitive database URLs in JdbcDialect add ba2bc4b [SPARK-20568][SS] Provide option to clean up completed files in streaming query No new revisions were added by this update. Summary of changes: docs/structured-streaming-programming-guide.md | 7 + .../execution/streaming/FileStreamOptions.scala| 37 .../sql/execution/streaming/FileStreamSource.scala | 111 +- .../sql/streaming/FileStreamSourceSuite.scala | 231 - 4 files changed, 375 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (441d4c9 -> d51d228)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 441d4c9 [SPARK-29723][SQL] Get date and time parts of an interval as java classes add d51d228 [SPARK-29397][CORE] Extend plugin interface to include the driver No new revisions were added by this update. Summary of changes: .../org/apache/spark/api/plugin/DriverPlugin.java | 111 ++ .../apache/spark/api/plugin/ExecutorPlugin.java| 57 + .../org/apache/spark/api/plugin/PluginContext.java | 84 .../org/apache/spark/api/plugin/SparkPlugin.java | 53 + .../main/scala/org/apache/spark/SparkContext.scala | 9 + .../scala/org/apache/spark/executor/Executor.scala | 7 + .../org/apache/spark/internal/config/package.scala | 11 + .../spark/internal/plugin/PluginContainer.scala| 152 + .../spark/internal/plugin/PluginContextImpl.scala | 84 .../spark/internal/plugin/PluginEndpoint.scala | 64 ++ .../internal/plugin/PluginContainerSuite.scala | 240 + docs/monitoring.md | 31 ++- 12 files changed, 899 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java create mode 100644 core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java create mode 100644 core/src/main/java/org/apache/spark/api/plugin/PluginContext.java create mode 100644 core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java create mode 100644 core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala create mode 100644 core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala create mode 100644 core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala create mode 100644 core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (fc7d918 -> 4583d14)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from fc7d918 [SPARK-28938][K8S][2.4][FOLLOWUP] Use `/usr/bin/tini` instead of `/sbin/tini` add 4583d14 [SPARK-29637][CORE] Add description to Job SHS web API No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala| 3 +++ core/src/main/scala/org/apache/spark/status/LiveEntity.scala | 3 ++- .../test/scala/org/apache/spark/status/AppStatusListenerSuite.scala| 3 ++- 3 files changed, 7 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (44c1c03 -> 9c817a8)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 44c1c03 [SPARK-29607][SQL] Move static methods from CalendarInterval to IntervalUtils add 9c817a8 [SPARK-29637][CORE] Add description to Job SHS web API No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala| 3 +++ core/src/main/scala/org/apache/spark/status/LiveEntity.scala | 3 ++- .../test/scala/org/apache/spark/status/AppStatusListenerSuite.scala| 3 ++- 3 files changed, 7 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2be1fe6 -> 762db39)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2be1fe6 [SPARK-29521][SQL] LOAD DATA INTO TABLE should look up catalog/table like v2 commands add 762db39 [SPARK-29509][SQL][SS] Deduplicate codes from Kafka data source No new revisions were added by this update. Summary of changes: .../apache/spark/sql/kafka010/KafkaWriteTask.scala | 69 +--- .../apache/spark/sql/kafka010/KafkaWriter.scala| 104 ++-- .../sql/kafka010/KafkaContinuousSinkSuite.scala| 166 --- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 179 +++-- 4 files changed, 188 insertions(+), 330 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2be1fe6 -> 762db39)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2be1fe6 [SPARK-29521][SQL] LOAD DATA INTO TABLE should look up catalog/table like v2 commands add 762db39 [SPARK-29509][SQL][SS] Deduplicate codes from Kafka data source No new revisions were added by this update. Summary of changes: .../apache/spark/sql/kafka010/KafkaWriteTask.scala | 69 +--- .../apache/spark/sql/kafka010/KafkaWriter.scala| 104 ++-- .../sql/kafka010/KafkaContinuousSinkSuite.scala| 166 --- .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 179 +++-- 4 files changed, 188 insertions(+), 330 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (78bdcfa -> 00347a3)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 78bdcfa [SPARK-27812][K8S] Bump K8S client version to 4.6.1 add 00347a3 [SPARK-28762][CORE] Read JAR main class if JAR is not located in local file system No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/TestUtils.scala| 20 +- .../org/apache/spark/deploy/SparkSubmit.scala | 29 +++- .../apache/spark/deploy/SparkSubmitArguments.scala | 28 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 77 ++ 4 files changed, 122 insertions(+), 32 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2f0a38c -> 100fc58)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2f0a38c [SPARK-29398][CORE] Support dedicated thread pools for RPC endpoints add 100fc58 [SPARK-28869][CORE] Roll over event log files No new revisions were added by this update. Summary of changes: .../spark/deploy/history/EventLogFileReaders.scala | 264 + .../spark/deploy/history/EventLogFileWriters.scala | 415 + .../spark/deploy/history/FsHistoryProvider.scala | 195 +- .../org/apache/spark/internal/config/package.scala | 15 + .../spark/scheduler/EventLoggingListener.scala | 222 ++- .../apache/spark/scheduler/ReplayListenerBus.scala | 11 +- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 4 +- .../deploy/history/EventLogFileReadersSuite.scala | 345 + .../deploy/history/EventLogFileWritersSuite.scala | 378 +++ .../spark/deploy/history/EventLogTestHelper.scala | 59 +++ .../deploy/history/FsHistoryProviderSuite.scala| 81 +++- .../scheduler/EventLoggingListenerSuite.scala | 108 +- .../spark/scheduler/ReplayListenerSuite.scala | 17 +- docs/configuration.md | 15 + 14 files changed, 1719 insertions(+), 410 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28947][K8S] Status logging not happens at an interval for liveness
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 02c5b4f [SPARK-28947][K8S] Status logging not happens at an interval for liveness 02c5b4f is described below commit 02c5b4f76337cc3901b8741887292bb4478931f3 Author: Kent Yao AuthorDate: Tue Oct 15 12:34:39 2019 -0700 [SPARK-28947][K8S] Status logging not happens at an interval for liveness ### What changes were proposed in this pull request? This pr invoke the start method of `LoggingPodStatusWatcherImpl` for status logging at intervals. ### Why are the changes needed? This pr invoke the start method of `LoggingPodStatusWatcherImpl` is declared but never called ### Does this PR introduce any user-facing change? no ### How was this patch tested? manually test Closes #25648 from yaooqinn/SPARK-28947. Authored-by: Kent Yao Signed-off-by: Marcelo Vanzin --- .../k8s/submit/KubernetesClientApplication.scala | 25 ++--- .../k8s/submit/LoggingPodStatusWatcher.scala | 61 ++ .../spark/deploy/k8s/submit/ClientSuite.scala | 5 +- 3 files changed, 33 insertions(+), 58 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 11bbad9..8e5532d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -86,15 +86,12 @@ private[spark] object ClientArguments { * @param builder Responsible for building the base driver pod based on a composition of *implemented features. * @param kubernetesClient the client to talk to the Kubernetes API server - * @param waitForAppCompletion a flag indicating whether the client should wait for the application - * to complete * @param watcher a watcher that monitors and logs the application status */ private[spark] class Client( conf: KubernetesDriverConf, builder: KubernetesDriverBuilder, kubernetesClient: KubernetesClient, -waitForAppCompletion: Boolean, watcher: LoggingPodStatusWatcher) extends Logging { def run(): Unit = { @@ -124,10 +121,11 @@ private[spark] class Client( .endVolume() .endSpec() .build() +val driverPodName = resolvedDriverPod.getMetadata.getName Utils.tryWithResource( kubernetesClient .pods() -.withName(resolvedDriverPod.getMetadata.getName) +.withName(driverPodName) .watch(watcher)) { _ => val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) try { @@ -141,16 +139,8 @@ private[spark] class Client( throw e } - val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" + -s"${resolvedDriverPod.getMetadata.getName}" - if (waitForAppCompletion) { -logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...") -watcher.awaitCompletion() -logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.") - } else { -logInfo(s"Deployed Spark application ${conf.appName} with " + - s"submission ID ${sId} into Kubernetes.") - } + val sId = Seq(conf.namespace, driverPodName).mkString(":") + watcher.watchOrStop(sId) } } @@ -199,13 +189,11 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { -val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") // For constructing the app ID, we can't use the Spark application name, as the app ID is going // to be added as a label to group resources belonging to the same application. Label values are // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" -val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, kubernetesAppId, @@ -215,9 +203,7 @@ private[spark] class KubernetesClientApplication extends SparkAp
[spark] branch branch-2.4 updated: [SPARK-28917][CORE] Synchronize access to RDD mutable state
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 4f46e8f [SPARK-28917][CORE] Synchronize access to RDD mutable state 4f46e8f is described below commit 4f46e8f804cba6d845116cb7daf9b4c682e6a0f1 Author: Imran Rashid AuthorDate: Tue Oct 8 11:35:54 2019 -0700 [SPARK-28917][CORE] Synchronize access to RDD mutable state RDD dependencies and partitions can be simultaneously accessed and mutated by user threads and spark's scheduler threads, so access must be thread-safe. In particular, as partitions and dependencies are lazily-initialized, before this change they could get initialized multiple times, which would lead to the scheduler having an inconsistent view of the pendings stages and get stuck. Tested with existing unit tests. Closes #25951 from squito/SPARK-28917. Authored-by: Imran Rashid Signed-off-by: Marcelo Vanzin (cherry picked from commit 0da667d31436c43e06cb6bb5ac65a17f65edd08b) Signed-off-by: Marcelo Vanzin --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 41 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 6 ++-- .../scala/org/apache/spark/DistributedSuite.scala | 15 3 files changed, 51 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0be2543..31c6bc1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -222,10 +222,24 @@ abstract class RDD[T: ClassTag]( /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel: StorageLevel = storageLevel + /** + * Lock for all mutable state of this RDD (persistence, partitions, dependencies, etc.). We do + * not use `this` because RDDs are user-visible, so users might have added their own locking on + * RDDs; sharing that could lead to a deadlock. + * + * One thread might hold the lock on many of these, for a chain of RDD dependencies; but + * because DAGs are acyclic, and we only ever hold locks for one path in that DAG, there is no + * chance of deadlock. + * + * The use of Integer is simply so this is serializable -- executors may reference the shared + * fields (though they should never mutate them, that only happens on the driver). + */ + private val stateLock = new Integer(0) + // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed - private var dependencies_ : Seq[Dependency[_]] = _ - @transient private var partitions_ : Array[Partition] = _ + @volatile private var dependencies_ : Seq[Dependency[_]] = _ + @volatile @transient private var partitions_ : Array[Partition] = _ /** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD) @@ -237,7 +251,11 @@ abstract class RDD[T: ClassTag]( final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { -dependencies_ = getDependencies +stateLock.synchronized { + if (dependencies_ == null) { +dependencies_ = getDependencies + } +} } dependencies_ } @@ -250,10 +268,14 @@ abstract class RDD[T: ClassTag]( final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { -partitions_ = getPartitions -partitions_.zipWithIndex.foreach { case (partition, index) => - require(partition.index == index, -s"partitions($index).partition == ${partition.index}, but it should equal $index") +stateLock.synchronized { + if (partitions_ == null) { +partitions_ = getPartitions +partitions_.zipWithIndex.foreach { case (partition, index) => + require(partition.index == index, +s"partitions($index).partition == ${partition.index}, but it should equal $index") +} + } } } partitions_ @@ -1798,7 +1820,7 @@ abstract class RDD[T: ClassTag]( * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`) * created from the checkpoint file, and forget its old dependencies and partitions. */ - private[spark] def markCheckpointed(): Unit = { + private[spark] def markCheckpointed(): Unit = stateLock.synchronized { clearDependencies() partitions_ = null deps = null// For
[spark] branch master updated (de360e9 -> 0da667d)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from de360e9 [SPARK-29336][SQL] Fix the implementation of QuantileSummaries.merge (guarantee that the relativeError will be respected) add 0da667d [SPARK-28917][CORE] Synchronize access to RDD mutable state No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/rdd/RDD.scala | 41 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 6 ++-- .../scala/org/apache/spark/DistributedSuite.scala | 15 3 files changed, 51 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4e0e4e5 -> 6b5e0e2)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4e0e4e5 [MINOR][TESTS] Rename JSONBenchmark to JsonBenchmark add 6b5e0e2 [SPARK-29054][SS] Invalidate Kafka consumer when new delegation token available No new revisions were added by this update. Summary of changes: external/kafka-0-10-sql/pom.xml| 7 ++ .../spark/sql/kafka010/KafkaDataConsumer.scala | 40 ++-- .../sql/kafka010/KafkaDataConsumerSuite.scala | 106 +++-- .../apache/spark/kafka010/KafkaConfigUpdater.scala | 8 +- .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 18 +++- .../spark/kafka010/KafkaConfigUpdaterSuite.scala | 19 +++- .../spark/kafka010/KafkaDelegationTokenTest.scala | 19 +++- .../spark/kafka010/KafkaRedactionUtilSuite.scala | 12 +-- .../spark/kafka010/KafkaTokenUtilSuite.scala | 83 9 files changed, 234 insertions(+), 78 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (f2ead4d -> 85dafab)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f2ead4d [SPARK-28970][SQL] Implement USE CATALOG/NAMESPACE for Data Source V2 add 85dafab [SPARK-29273][CORE] Save peakExecutionMemory value when writing task end to event log No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 1 + core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 5 + core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 6 +- 3 files changed, 11 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-29055][CORE] Update driver/executors' storage memory when block is removed from BlockManager
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 3173439 [SPARK-29055][CORE] Update driver/executors' storage memory when block is removed from BlockManager 3173439 is described below commit 31734399d57f3c128e66b0f97ef83eb4c9165978 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Tue Oct 1 09:41:51 2019 -0700 [SPARK-29055][CORE] Update driver/executors' storage memory when block is removed from BlockManager This patch proposes to fix the issue that storage memory is not decreasing even block is removed in BlockManager. Originally the issue is found while removed broadcast doesn't reflect the storage memory on driver/executors. AppStatusListener expects the value of memory in events on block update as "delta" so that it adjusts driver/executors' storage memory based on delta, but when removing block BlockManager reports the delta as 0, so the storage memory is not decreased. `BlockManager.dropFromMemory` deals with this correctly, so some of path of freeing memory has been updated correctly. The storage memory in metrics in AppStatusListener is now out of sync which lets end users easy to confuse as memory leak is happening. No. Modified UTs. Also manually tested via running simple query repeatedly and observe executor page of Spark UI to see the value of storage memory is decreasing as well. Please refer the description of [SPARK-29055](https://issues.apache.org/jira/browse/SPARK-29055) to get simple reproducer. Closes #25973 from HeartSaVioR/SPARK-29055. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin (cherry picked from commit a4601cb44e3709699f616f83d486351a1f459f8c) Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/storage/BlockManager.scala| 10 +++- .../apache/spark/storage/BlockManagerSuite.scala | 57 -- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e35dd72..9e1693a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1583,15 +1583,23 @@ private[spark] class BlockManager( * lock on the block. */ private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = { +val blockStatus = if (tellMaster) { + val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId) + Some(getCurrentBlockStatus(blockId, blockInfo)) +} else None + // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) if (!removedFromMemory && !removedFromDisk) { logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory") } + blockInfoManager.removeBlock(blockId) if (tellMaster) { - reportBlockStatus(blockId, BlockStatus.empty) + // Only update storage level from the captured block status before deleting, so that + // memory size and disk size are being kept for calculating delta. + reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE)) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index abde4df..b44cef5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -27,8 +27,8 @@ import scala.language.{implicitConversions, postfixOps} import scala.reflect.ClassTag import org.apache.commons.lang3.RandomUtils -import org.mockito.{Matchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.{ArgumentCaptor, Matchers => mc} +import org.mockito.Mockito.{mock, never, spy, times, verify, when} import org.scalatest._ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ @@ -128,9 +128,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // need to create a SparkContext is to initialize LiveListenerBus. sc = mock(classOf[SparkContext]) when(sc.conf).thenReturn(conf) -master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", +master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, -new LiveL
[spark] branch master updated (0cf2f48 -> a4601cb)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0cf2f48 [SPARK-29022][SQL] Fix SparkSQLCLI can not add jars by AddJarCommand add a4601cb [SPARK-29055][CORE] Update driver/executors' storage memory when block is removed from BlockManager No new revisions were added by this update. Summary of changes: .../org/apache/spark/storage/BlockManager.scala| 10 +++- .../apache/spark/storage/BlockManagerSuite.scala | 60 -- 2 files changed, 64 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of `TaskSchedulerImplSuite`
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 9ae7393 [SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of `TaskSchedulerImplSuite` 9ae7393 is described below commit 9ae73932bb749bde6b71cbe6cf595ec2d23b60ea Author: Xingbo Jiang AuthorDate: Fri Sep 27 16:31:23 2019 -0700 [SPARK-29263][CORE][TEST][FOLLOWUP][2.4] Fix build failure of `TaskSchedulerImplSuite` ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/25946 Fixed a bug and modified the `TaskSchedulerImplSuite`, when backported to 2.4 it breaks the build. This PR is to fix the broken test build. ### How was this patch tested? Passed locally. Closes #25952 from jiangxb1987/SPARK-29263. Authored-by: Xingbo Jiang Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala| 10 +++--- .../scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 5c0601eb03..ecbb6ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -77,7 +77,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = { -val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite") +setupSchedulerWithMaster("local", confs: _*) + } + + def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = { +val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite") confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) taskScheduler = new TaskSchedulerImpl(sc) @@ -1129,7 +1133,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // you'd need the previous stage to also get restarted, and then succeed, in between each // attempt, but that happens outside what we're mocking here.) val zombieAttempts = (0 until 2).map { stageAttempt => - val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + val attempt = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = stageAttempt) taskScheduler.submitTasks(attempt) val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } @@ -1148,7 +1152,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for // the stage, but this time with insufficient resources so not all tasks are active. -val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +val finalAttempt = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 2) taskScheduler.submitTasks(finalAttempt) val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d264ada..93a4b1f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1398,7 +1398,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskSetManager1.isZombie) assert(taskSetManager1.runningTasks === 9) -val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1) +val taskSet2 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1) sched.submitTasks(taskSet2) sched.resourceOffers( (11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (420abb4 -> 233c214)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 420abb4 [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` before checking available slots for barrier taskSet add 233c214 [SPARK-29070][CORE] Make SparkLauncher log full spark-submit command line No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/spark/launcher/SparkLauncher.java| 8 1 file changed, 8 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (8beb736 -> d3679a9)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 8beb736 [SPARK-29256][DOCS] Fix typo in building document add d3679a9 [SPARK-27748][SS][FOLLOWUP] Correct the order of logging token as debug log No new revisions were added by this update. Summary of changes: .../src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (db9e0fd -> f32f16f)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from db9e0fd [SPARK-22796][PYTHON][ML] Add multiple columns support to PySpark QuantileDiscretizer add f32f16f [SPARK-29082][CORE] Skip delegation token generation if no credentials are available No new revisions were added by this update. Summary of changes: .../security/HadoopDelegationTokenManager.scala| 22 .../cluster/CoarseGrainedSchedulerBackend.scala| 10 +++--- .../HadoopDelegationTokenManagerSuite.scala| 39 +- 3 files changed, 59 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (dfb0a8b -> cd48177)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from dfb0a8b [SPARK-28208][BUILD][SQL] Upgrade to ORC 1.5.6 including closing the ORC readers add cd48177 [SPARK-28091][CORE] Extend Spark metrics system with user-defined metrics using executor plugins No new revisions were added by this update. Summary of changes: .../main/java/org/apache/spark/ExecutorPlugin.java | 7 +++- .../org/apache/spark/ExecutorPluginContext.java| 40 --- .../scala/org/apache/spark/executor/Executor.scala | 10 - .../ExecutorPluginSource.scala}| 13 -- .../java/org/apache/spark/ExecutorPluginSuite.java | 46 -- docs/monitoring.md | 5 +++ project/MimaExcludes.scala | 3 ++ 7 files changed, 98 insertions(+), 26 deletions(-) copy sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java => core/src/main/java/org/apache/spark/ExecutorPluginContext.java (54%) copy core/src/main/scala/org/apache/spark/{metrics/source/Source.scala => executor/ExecutorPluginSource.scala} (77%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-29105][CORE] Keep driver log file size up to date in HDFS
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 276 [SPARK-29105][CORE] Keep driver log file size up to date in HDFS 276 is described below commit 276e8d404975f8701089e9f4dfecd16e0d9f Author: Marcelo Vanzin AuthorDate: Wed Sep 18 09:11:55 2019 -0700 [SPARK-29105][CORE] Keep driver log file size up to date in HDFS HDFS doesn't update the file size reported by the NM if you just keep writing to the file; this makes the SHS believe the file is inactive, and so it may delete it after the configured max age for log files. This change uses hsync to keep the log file as up to date as possible when using HDFS. It also disables erasure coding by default for these logs, since hsync (& friends) does not work with EC. Tested with a SHS configured to aggressively clean up logs; verified a spark-shell session kept updating the log, which was not deleted by the SHS. Closes #25819 from vanzin/SPARK-29105. Authored-by: Marcelo Vanzin Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 54 -- .../org/apache/spark/internal/config/package.scala | 5 ++ .../spark/scheduler/EventLoggingListener.scala | 8 +--- .../apache/spark/util/logging/DriverLogger.scala | 16 ++- docs/configuration.md | 10 .../apache/spark/streaming/util/HdfsUtils.scala| 2 +- 6 files changed, 61 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 11420bb..8ba1ebf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -454,35 +454,39 @@ private[spark] object SparkHadoopUtil { // scalastyle:off line.size.limit /** - * Create a path that uses replication instead of erasure coding (ec), regardless of the default - * configuration in hdfs for the given path. This can be helpful as hdfs ec doesn't support - * hflush(), hsync(), or append() + * Create a file on the given file system, optionally making sure erasure coding is disabled. + * + * Disabling EC can be helpful as HDFS EC doesn't support hflush(), hsync(), or append(). * https://hadoop.apache.org/docs/r3.0.0/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html#Limitations */ // scalastyle:on line.size.limit - def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = { -try { - // Use reflection as this uses APIs only available in Hadoop 3 - val builderMethod = fs.getClass().getMethod("createFile", classOf[Path]) - // the builder api does not resolve relative paths, nor does it create parent dirs, while - // the old api does. - if (!fs.mkdirs(path.getParent())) { -throw new IOException(s"Failed to create parents of $path") + def createFile(fs: FileSystem, path: Path, allowEC: Boolean): FSDataOutputStream = { +if (allowEC) { + fs.create(path) +} else { + try { +// Use reflection as this uses APIs only available in Hadoop 3 +val builderMethod = fs.getClass().getMethod("createFile", classOf[Path]) +// the builder api does not resolve relative paths, nor does it create parent dirs, while +// the old api does. +if (!fs.mkdirs(path.getParent())) { + throw new IOException(s"Failed to create parents of $path") +} +val qualifiedPath = fs.makeQualified(path) +val builder = builderMethod.invoke(fs, qualifiedPath) +val builderCls = builder.getClass() +// this may throw a NoSuchMethodException if the path is not on hdfs +val replicateMethod = builderCls.getMethod("replicate") +val buildMethod = builderCls.getMethod("build") +val b2 = replicateMethod.invoke(builder) +buildMethod.invoke(b2).asInstanceOf[FSDataOutputStream] + } catch { +case _: NoSuchMethodException => + // No createFile() method, we're using an older hdfs client, which doesn't give us control + // over EC vs. replication. Older hdfs doesn't have EC anyway, so just create a file with + // old apis. + fs.create(path) } - val qualifiedPath = fs.makeQualified(path) - val builder = builderMethod.invoke(fs, qualifiedPath) - val builderCls = builder.getClass() - // this may throw a NoSuchMethodException if the path is not on hdfs - val replicateMethod = builderCls.getMethod("replicate") - val buildMethod =
[spark] branch master updated (02db706 -> 71e7516)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 02db706 [SPARK-29115][SQL][TEST] Add benchmarks for make_date() and make_timestamp() add 71e7516 [SPARK-29027][TESTS] KafkaDelegationTokenSuite fix when loopback canonical host name differs from localhost No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 17 ++--- 1 file changed, 10 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b62ef8f -> c18f849)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b62ef8f [SPARK-29007][STREAMING][MLLIB][TESTS] Enforce not leaking SparkContext in tests which creates new StreamingContext with new SparkContext add c18f849 [SPARK-24663][STREAMING][TESTS] StreamingContextSuite: Wait until slow receiver has been initialized, but with hard timeout No new revisions were added by this update. Summary of changes: .../org/apache/spark/streaming/StreamingContextSuite.scala | 12 1 file changed, 8 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-29007][STREAMING][MLLIB][TESTS] Enforce not leaking SparkContext in tests which creates new StreamingContext with new SparkContext
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b62ef8f [SPARK-29007][STREAMING][MLLIB][TESTS] Enforce not leaking SparkContext in tests which creates new StreamingContext with new SparkContext b62ef8f is described below commit b62ef8f7935ae5c9a4a5e7e8a17aa5d7375c85b1 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Wed Sep 11 10:29:13 2019 -0700 [SPARK-29007][STREAMING][MLLIB][TESTS] Enforce not leaking SparkContext in tests which creates new StreamingContext with new SparkContext ### What changes were proposed in this pull request? This patch enforces tests to prevent leaking newly created SparkContext while is created via initializing StreamingContext. Leaking SparkContext in test would make most of following tests being failed as well, so this patch applies defensive programming, trying its best to ensure SparkContext is cleaned up. ### Why are the changes needed? We got some case in CI build where SparkContext is being leaked and other tests are affected by leaked SparkContext. Ideally we should isolate the environment among tests if possible. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified UTs. Closes #25709 from HeartSaVioR/SPARK-29007. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin --- external/kafka-0-10/pom.xml| 7 ++ .../kafka010/DirectKafkaStreamSuite.scala | 24 +++ .../streaming/kinesis/KinesisStreamSuite.scala | 37 -- mllib/pom.xml | 7 ++ .../StreamingLogisticRegressionSuite.scala | 16 ++--- .../mllib/clustering/StreamingKMeansSuite.scala| 13 +--- .../StreamingLinearRegressionSuite.scala | 16 ++--- .../apache/spark/streaming/CheckpointSuite.scala | 18 ++--- .../spark/streaming/DStreamClosureSuite.scala | 16 + .../apache/spark/streaming/DStreamScopeSuite.scala | 23 +++--- .../spark/streaming/LocalStreamingContext.scala| 83 ++ .../apache/spark/streaming/MapWithStateSuite.scala | 30 +++- .../streaming/ReceiverInputDStreamSuite.scala | 16 ++--- .../spark/streaming/StreamingContextSuite.scala| 52 ++ .../spark/streaming/StreamingListenerSuite.scala | 11 +-- .../org/apache/spark/streaming/TestSuiteBase.scala | 30 .../spark/streaming/WindowOperationsSuite.scala| 19 ++--- .../scheduler/ExecutorAllocationManagerSuite.scala | 19 ++--- .../scheduler/InputInfoTrackerSuite.scala | 22 ++ .../streaming/scheduler/RateControllerSuite.scala | 6 +- .../ui/StreamingJobProgressListenerSuite.scala | 16 ++--- 21 files changed, 240 insertions(+), 241 deletions(-) diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 397de87..d11569d 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -47,6 +47,13 @@ org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark spark-core_${scala.binary.version} ${project.version} test-jar diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 4d3e476..26b41e6 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.streaming.kafka010 import java.io.File -import java.lang.{ Long => JLong } -import java.util.{ Arrays, HashMap => JHashMap, Map => JMap, UUID } +import java.lang.{Long => JLong} +import java.util.{Arrays, HashMap => JHashMap, Map => JMap, UUID} import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong @@ -31,13 +31,12 @@ import scala.util.Random import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.concurrent.Eventually import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.{LocalStreamingContext, Milliseconds, StreamingCont
[spark] branch master updated (f263909 -> 2736efa)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f263909 [SPARK-23243][CORE][FOLLOWUP] Remove todo added by SPARK-23207 add 2736efa [SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking recorded values No new revisions were added by this update. Summary of changes: .../apache/spark/scheduler/DAGSchedulerSuite.scala | 138 +++-- 1 file changed, 71 insertions(+), 67 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fa75db2 -> 7f36cd2)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fa75db2 [SPARK-29026][SQL] Improve error message in `schemaFor` in trait without companion object constructor add 7f36cd2 [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API No new revisions were added by this update. Summary of changes: .../shuffle/api/ShuffleExecutorComponents.java | 31 +- .../api/SingleSpillShuffleMapOutputWriter.java | 12 +- .../spark/shuffle/sort/UnsafeShuffleWriter.java| 352 +++-- .../io/LocalDiskShuffleExecutorComponents.java | 15 + .../sort/io/LocalDiskShuffleMapOutputWriter.java | 24 +- .../io/LocalDiskSingleSpillMapOutputWriter.java| 55 .../org/apache/spark/internal/config/package.scala | 2 +- .../spark/shuffle/sort/SortShuffleManager.scala| 4 +- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 54 ++-- 9 files changed, 347 insertions(+), 202 deletions(-) copy common/kvstore/src/main/java/org/apache/spark/util/kvstore/UnsupportedStoreVersionException.java => core/src/main/java/org/apache/spark/shuffle/api/SingleSpillShuffleMapOutputWriter.java (67%) create mode 100644 core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (8018ded -> e516f7e)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 8018ded [SPARK-28214][STREAMING][TESTS] CheckpointSuite: wait for batch to be fully processed before accessing DStreamCheckpointData add e516f7e [SPARK-28928][SS] Use Kafka delegation token protocol on sources/sinks No new revisions were added by this update. Summary of changes: docs/structured-streaming-kafka-integration.md | 4 +++- .../sql/kafka010/KafkaDelegationTokenSuite.scala | 2 -- .../apache/spark/kafka010/KafkaConfigUpdater.scala | 1 + .../spark/kafka010/KafkaTokenSparkConf.scala | 4 +++- .../spark/kafka010/KafkaConfigUpdaterSuite.scala | 26 +- 5 files changed, 32 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (125af78d -> 8018ded)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 125af78d [SPARK-28831][DOC][SQL] Document CLEAR CACHE statement in SQL Reference add 8018ded [SPARK-28214][STREAMING][TESTS] CheckpointSuite: wait for batch to be fully processed before accessing DStreamCheckpointData No new revisions were added by this update. Summary of changes: .../spark/streaming/scheduler/JobGenerator.scala | 2 +- .../spark/streaming/scheduler/JobScheduler.scala | 3 +- .../apache/spark/streaming/CheckpointSuite.scala | 32 -- 3 files changed, 26 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (712874f -> 594c9c5)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 712874f [SPARK-28931][CORE][TESTS] Fix couple of bugs in FsHistoryProviderSuite add 594c9c5 [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer No new revisions were added by this update. Summary of changes: docs/structured-streaming-kafka-integration.md | 73 ++- external/kafka-0-10-sql/pom.xml| 10 + .../spark/sql/kafka010/FetchedDataPool.scala | 183 ++ .../sql/kafka010/InternalKafkaConsumerPool.scala | 221 +++ .../org/apache/spark/sql/kafka010/KafkaBatch.scala | 2 +- .../sql/kafka010/KafkaBatchPartitionReader.scala | 11 +- .../spark/sql/kafka010/KafkaContinuousStream.scala | 2 +- .../spark/sql/kafka010/KafkaDataConsumer.scala | 689 ++--- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 7 +- .../apache/spark/sql/kafka010/KafkaRelation.scala | 2 +- .../apache/spark/sql/kafka010/KafkaSource.scala| 3 +- .../apache/spark/sql/kafka010/KafkaSourceRDD.scala | 5 +- .../org/apache/spark/sql/kafka010/package.scala| 36 ++ .../spark/sql/kafka010/FetchedDataPoolSuite.scala | 304 + .../kafka010/InternalKafkaConsumerPoolSuite.scala | 285 + .../sql/kafka010/KafkaDataConsumerSuite.scala | 205 +- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 1 - pom.xml| 2 + 18 files changed, 1642 insertions(+), 399 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedDataPoolSuite.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPoolSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9f478a6 -> 712874f)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9f478a6 [SPARK-28901][SQL] SparkThriftServer's Cancel SQL Operation show it in JDBC Tab UI add 712874f [SPARK-28931][CORE][TESTS] Fix couple of bugs in FsHistoryProviderSuite No new revisions were added by this update. Summary of changes: .../deploy/history/FsHistoryProviderSuite.scala| 102 ++--- 1 file changed, 48 insertions(+), 54 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ea90ea6 [SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter ea90ea6 is described below commit ea90ea6ce7e1e07064279b2f78a0301fd2048f11 Author: mcheah AuthorDate: Fri Aug 30 09:43:07 2019 -0700 [SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter ## What changes were proposed in this pull request? Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer. ## How was this patch tested? Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions. Closes #25342 from mccheah/shuffle-writer-refactor-sort-shuffle-writer. Lead-authored-by: mcheah Co-authored-by: mccheah Signed-off-by: Marcelo Vanzin --- .../shuffle/ShufflePartitionPairsWriter.scala | 126 + .../spark/shuffle/sort/SortShuffleManager.scala| 3 +- .../spark/shuffle/sort/SortShuffleWriter.scala | 23 ++-- .../spark/storage/DiskBlockObjectWriter.scala | 6 +- .../spark/util/collection/ExternalSorter.scala | 88 -- .../apache/spark/util/collection/PairsWriter.scala | 28 + .../WritablePartitionedPairCollection.scala| 4 +- .../shuffle/sort/SortShuffleWriterSuite.scala | 18 ++- 8 files changed, 265 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala new file mode 100644 index 000..a988c5e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io.{Closeable, IOException, OutputStream} + +import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager} +import org.apache.spark.shuffle.api.ShufflePartitionWriter +import org.apache.spark.storage.BlockId +import org.apache.spark.util.Utils +import org.apache.spark.util.collection.PairsWriter + +/** + * A key-value writer inspired by {@link DiskBlockObjectWriter} that pushes the bytes to an + * arbitrary partition writer instead of writing to local disk through the block manager. + */ +private[spark] class ShufflePartitionPairsWriter( +partitionWriter: ShufflePartitionWriter, +serializerManager: SerializerManager, +serializerInstance: SerializerInstance, +blockId: BlockId, +writeMetrics: ShuffleWriteMetricsReporter) + extends PairsWriter with Closeable { + + private var isClosed = false + private var partitionStream: OutputStream = _ + private var wrappedStream: OutputStream = _ + private var objOut: SerializationStream = _ + private var numRecordsWritten = 0 + private var curNumBytesWritten = 0L + + override def write(key: Any, value: Any): Unit = { +if (isClosed) { + throw new IOException("Partition pairs writer is already closed.") +} +if (objOut == null) { + open() +} +objOut.writeKey(key) +objOut.writeValue(value) +recordWritten() + } + + private def open(): Unit = { +try { + partitionStream = partitionWriter.openStream + wrappedStream = serializerManager.wrapStream(blockId, partitionStream) + objOut = serializerInstance.serializeStream(wrappedStream) +} catch { + case e: Exception => +Utils.tryLogNonFatalError { + close() +} +throw e +} + } + + override def close(): Unit = { +if (!isClosed) { + Utils.tryWithSafeFinally { +Utils.tryWithSafeFinally { + objOut = closeIfNonNull(objOut) + // Setting these to null will prevent the underlying streams from being closed twice + // just in case any stream's clo
[spark] branch master updated (fb1053d -> 7d72c07)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from fb1053d [SPARK-28807][DOCS][SQL] Document SHOW DATABASES in SQL Reference add 7d72c07 [SPARK-28760][SS][TESTS] Add Kafka delegation token end-to-end test with mini KDC No new revisions were added by this update. Summary of changes: external/kafka-0-10-sql/pom.xml| 4 + .../sql/kafka010/KafkaDelegationTokenSuite.scala | 120 +++ .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 164 +++-- .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 19 ++- pom.xml| 24 +++ 5 files changed, 314 insertions(+), 17 deletions(-) create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b205269 -> 64032cb)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b205269 [SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to make sure new consumer used add 64032cb [MINOR][SS] Reuse KafkaSourceInitialOffsetWriter to deduplicate No new revisions were added by this update. Summary of changes: .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 41 -- .../apache/spark/sql/kafka010/KafkaSource.scala| 34 +--- .../kafka010/KafkaSourceInitialOffsetWriter.scala | 63 ++ 3 files changed, 64 insertions(+), 74 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceInitialOffsetWriter.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to make sure new consumer used
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b205269 [SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to make sure new consumer used b205269 is described below commit b205269ae09dc384de98ab027da4c17abf3a9dd9 Author: Gabor Somogyi AuthorDate: Mon Aug 26 13:12:14 2019 -0700 [SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to make sure new consumer used ### What changes were proposed in this pull request? When Task retry happens with Kafka source then it's not known whether the consumer is the issue so the old consumer removed from cache and new consumer created. The feature works fine but not covered with tests. In this PR I've added such test for DStreams + Structured Streaming. ### Why are the changes needed? No such tests are there. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing + new unit tests. Closes #25582 from gaborgsomogyi/SPARK-28875. Authored-by: Gabor Somogyi Signed-off-by: Marcelo Vanzin --- .../spark/sql/kafka010/KafkaDataConsumer.scala | 6 +-- .../sql/kafka010/KafkaDataConsumerSuite.scala | 62 +++--- .../kafka010/KafkaDataConsumerSuite.scala | 24 + 3 files changed, 71 insertions(+), 21 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index cbb99fd..af240dc 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -78,7 +78,7 @@ private[kafka010] sealed trait KafkaDataConsumer { def release(): Unit /** Reference to the internal implementation that this wrapper delegates to */ - protected def internalConsumer: InternalKafkaConsumer + def internalConsumer: InternalKafkaConsumer } @@ -512,7 +512,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { override def release(): Unit = { internalConsumer.close() } } - private case class CacheKey(groupId: String, topicPartition: TopicPartition) { + private[kafka010] case class CacheKey(groupId: String, topicPartition: TopicPartition) { def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) = this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], topicPartition) } @@ -521,7 +521,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { // - We make a best-effort attempt to maintain the max size of the cache as configured capacity. // The capacity is not guaranteed to be maintained, especially when there are more active // tasks simultaneously using consumers than the capacity. - private lazy val cache = { + private[kafka010] lazy val cache = { val conf = SparkEnv.get.conf val capacity = conf.get(CONSUMER_CACHE_CAPACITY) new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala index 2aa869c..8aa7e06 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala @@ -20,22 +20,23 @@ package org.apache.spark.sql.kafka010 import java.util.concurrent.{Executors, TimeUnit} import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.Duration import scala.util.Random -import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerConfig._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.scalatest.PrivateMethodTester import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.ThreadUtils class KafkaDataConsumerSuite extends SharedSparkSession with PrivateMethodTester { protected var testUtils: KafkaTestUtils = _ + private val topic = "topic" + Random.nextInt() + private val topicPartition = new TopicPartition(topic, 0) + private val groupId = "groupId" override def beforeAll(): Unit = { super.beforeAll() @@ -51,6 +52,15 @@ class
[spark] branch master updated: [SPARK-28679][YARN] changes to setResourceInformation to handle empty resources and reflection error handling
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dd0725d [SPARK-28679][YARN] changes to setResourceInformation to handle empty resources and reflection error handling dd0725d is described below commit dd0725d7eaf2f0dc9fb3b13326b07c32812b0ad9 Author: Alessandro Bellina AuthorDate: Mon Aug 26 12:00:33 2019 -0700 [SPARK-28679][YARN] changes to setResourceInformation to handle empty resources and reflection error handling ## What changes were proposed in this pull request? This fixes issues that can arise when the jars for different hadoop versions mix, and short-circuits the case where we are running with a spark that was not built for yarn 3 (resource support). ## How was this patch tested? I tested it manually. Closes #25403 from abellina/SPARK-28679. Authored-by: Alessandro Bellina Signed-off-by: Marcelo Vanzin --- .../spark/deploy/yarn/ResourceRequestHelper.scala | 21 - 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala index 522c16b..f524962 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala @@ -143,17 +143,28 @@ private object ResourceRequestHelper extends Logging { require(resource != null, "Resource parameter should not be null!") logDebug(s"Custom resources requested: $resources") +if (resources.isEmpty) { + // no point in going forward, as we don't have anything to set + return +} + if (!isYarnResourceTypesAvailable()) { - if (resources.nonEmpty) { -logWarning("Ignoring custom resource requests because " + -"the version of YARN does not support it!") - } + logWarning("Ignoring custom resource requests because " + + "the version of YARN does not support it!") return } val resInfoClass = Utils.classForName(RESOURCE_INFO_CLASS) val setResourceInformationMethod = - resource.getClass.getMethod("setResourceInformation", classOf[String], resInfoClass) + try { +resource.getClass.getMethod("setResourceInformation", classOf[String], resInfoClass) + } catch { +case e: NoSuchMethodException => + throw new SparkException( +s"Cannot find setResourceInformation in ${resource.getClass}. " + + "This is likely due to a JAR conflict between different YARN versions.", e) + } + resources.foreach { case (name, rawAmount) => try { val AMOUNT_AND_UNIT_REGEX(amountPart, unitPart) = rawAmount - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (13fd32c -> 2efa6f5)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 13fd32c [SPARK-28701][TEST-HADOOP3.2][TEST-JAVA11][K8S] adding java11 support for pull request builds add 2efa6f5 [SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice No new revisions were added by this update. Summary of changes: .../spark/shuffle/api/ShuffleMapOutputWriter.java | 8 ++- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 58 ++ .../sort/io/LocalDiskShuffleMapOutputWriter.java | 3 +- .../io/LocalDiskShuffleMapOutputWriterSuite.scala | 6 +-- 4 files changed, 35 insertions(+), 40 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org