[spark] branch master updated: [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan when not used
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new faa6198 [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan when not used faa6198 is described below commit faa61980c475f48b83694501d3c86e1709a595da Author: Douglas R Colkitt AuthorDate: Sat Feb 23 14:00:57 2019 -0600 [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan when not used Prior to this patch, all DataFrameReader.csv() calls would collect the first line from the CSV input iterator. This is done to allow schema inference from the header row. However when schema is already specified this is a wasteful operation. It results in an unncessary compute step on the first partition. This can be expensive if the CSV itself is expensive to generate (e.g. it's the product of a long-running external pipe()). This patch short-circuits the first-line collection in DataFrameReader.csv() when schema is specified. Thereby improving CSV read performance in certain cases. ## What changes were proposed in this pull request? Short-circuiting DataFrameReader.csv() first-line read when schema is user-specified. ## How was this patch tested? Compiled and tested against several CSV datasets. Closes #23830 from Mister-Meeseeks/master. Authored-by: Douglas R Colkitt Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 14 +- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e757785..ff295b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -508,7 +508,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { sparkSession.sessionState.conf.sessionLocalTimeZone) val filteredLines: Dataset[String] = CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions) -val maybeFirstLine: Option[String] = filteredLines.take(1).headOption + +// For performance, short-circuit the collection of the first line when it won't be used: +// - TextInputCSVDataSource - Only uses firstLine to infer an unspecified schema +// - CSVHeaderChecker - Only uses firstLine to check header, when headerFlag is true +// - CSVUtils - Only uses firstLine to filter headers, when headerFlag is true +// (If the downstream logic grows more complicated, consider refactoring to an approach that +// delegates this decision to the constituent consumers themselves.) +val maybeFirstLine: Option[String] = + if (userSpecifiedSchema.isEmpty || parsedOptions.headerFlag) { +filteredLines.take(1).headOption + } else { +None + } val schema = userSpecifiedSchema.getOrElse { TextInputCSVDataSource.inferFromDataset( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan when not used
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new faa6198 [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan when not used faa6198 is described below commit faa61980c475f48b83694501d3c86e1709a595da Author: Douglas R Colkitt AuthorDate: Sat Feb 23 14:00:57 2019 -0600 [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan when not used Prior to this patch, all DataFrameReader.csv() calls would collect the first line from the CSV input iterator. This is done to allow schema inference from the header row. However when schema is already specified this is a wasteful operation. It results in an unncessary compute step on the first partition. This can be expensive if the CSV itself is expensive to generate (e.g. it's the product of a long-running external pipe()). This patch short-circuits the first-line collection in DataFrameReader.csv() when schema is specified. Thereby improving CSV read performance in certain cases. ## What changes were proposed in this pull request? Short-circuiting DataFrameReader.csv() first-line read when schema is user-specified. ## How was this patch tested? Compiled and tested against several CSV datasets. Closes #23830 from Mister-Meeseeks/master. Authored-by: Douglas R Colkitt Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 14 +- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e757785..ff295b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -508,7 +508,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { sparkSession.sessionState.conf.sessionLocalTimeZone) val filteredLines: Dataset[String] = CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions) -val maybeFirstLine: Option[String] = filteredLines.take(1).headOption + +// For performance, short-circuit the collection of the first line when it won't be used: +// - TextInputCSVDataSource - Only uses firstLine to infer an unspecified schema +// - CSVHeaderChecker - Only uses firstLine to check header, when headerFlag is true +// - CSVUtils - Only uses firstLine to filter headers, when headerFlag is true +// (If the downstream logic grows more complicated, consider refactoring to an approach that +// delegates this decision to the constituent consumers themselves.) +val maybeFirstLine: Option[String] = + if (userSpecifiedSchema.isEmpty || parsedOptions.headerFlag) { +filteredLines.take(1).headOption + } else { +None + } val schema = userSpecifiedSchema.getOrElse { TextInputCSVDataSource.inferFromDataset( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26963][MLLIB] SizeEstimator can't make some JDK fields accessible in Java 9+
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ab4e83a [SPARK-26963][MLLIB] SizeEstimator can't make some JDK fields accessible in Java 9+ ab4e83a is described below commit ab4e83aca7cecc6cce0f0c51f3674481b381ba34 Author: Sean Owen AuthorDate: Sat Feb 23 11:01:47 2019 -0600 [SPARK-26963][MLLIB] SizeEstimator can't make some JDK fields accessible in Java 9+ ## What changes were proposed in this pull request? Don't use inaccessible fields in SizeEstimator, which comes up in Java 9+ ## How was this patch tested? Manually ran tests with Java 11; it causes these tests that failed before to pass. This ought to pass on Java 8 as there's effectively no change for Java 8. Closes #23866 from srowen/SPARK-26963. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/util/SizeEstimator.scala | 16 ++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index e12b6b7..4837b01 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -334,9 +334,21 @@ object SizeEstimator extends Logging { if (fieldClass.isPrimitive) { sizeCount(primitiveSize(fieldClass)) += 1 } else { - field.setAccessible(true) // Enable future get()'s on this field + // Note: in Java 9+ this would be better with trySetAccessible and canAccess + try { +field.setAccessible(true) // Enable future get()'s on this field +pointerFields = field :: pointerFields + } catch { +// If the field isn't accessible, we can still record the pointer size +// but can't know more about the field, so ignore it +case _: SecurityException => + // do nothing +// Java 9+ can throw InaccessibleObjectException but the class is Java 9+-only +case re: RuntimeException +if re.getClass.getSimpleName == "InaccessibleObjectException" => + // do nothing + } sizeCount(pointerSize) += 1 - pointerFields = field :: pointerFields } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][DOCS] Remove references to Shark
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a07b07f [MINOR][DOCS] Remove references to Shark a07b07f is described below commit a07b07fd85345a195623af205e346b494ae92fe2 Author: seancxmao AuthorDate: Sat Feb 23 11:03:05 2019 -0600 [MINOR][DOCS] Remove references to Shark ## What changes were proposed in this pull request? This PR aims to remove references to "Shark", which is a precursor to Spark SQL. I searched the whole project for the text "Shark" (ignore case) and just found a single match. Note that occurrences like nickname or test data are irrelevant. ## How was this patch tested? N/A. Change comments only. Closes #23876 from seancxmao/remove-Shark. Authored-by: seancxmao Signed-off-by: Sean Owen --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 93d5cd7..799e7e4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -51,9 +51,6 @@ import org.apache.spark.util.{RpcUtils, Utils} * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently * Spark code finds the SparkEnv through a global variable, so all the threads can access the same * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext). - * - * NOTE: This is not intended for external use. This is exposed for Shark and may be made private - * in a future release. */ @DeveloperApi class SparkEnv ( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][DOCS] Remove references to Shark
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a07b07f [MINOR][DOCS] Remove references to Shark a07b07f is described below commit a07b07fd85345a195623af205e346b494ae92fe2 Author: seancxmao AuthorDate: Sat Feb 23 11:03:05 2019 -0600 [MINOR][DOCS] Remove references to Shark ## What changes were proposed in this pull request? This PR aims to remove references to "Shark", which is a precursor to Spark SQL. I searched the whole project for the text "Shark" (ignore case) and just found a single match. Note that occurrences like nickname or test data are irrelevant. ## How was this patch tested? N/A. Change comments only. Closes #23876 from seancxmao/remove-Shark. Authored-by: seancxmao Signed-off-by: Sean Owen --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 93d5cd7..799e7e4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -51,9 +51,6 @@ import org.apache.spark.util.{RpcUtils, Utils} * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently * Spark code finds the SparkEnv through a global variable, so all the threads can access the same * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext). - * - * NOTE: This is not intended for external use. This is exposed for Shark and may be made private - * in a future release. */ @DeveloperApi class SparkEnv ( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26963][MLLIB] SizeEstimator can't make some JDK fields accessible in Java 9+
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ab4e83a [SPARK-26963][MLLIB] SizeEstimator can't make some JDK fields accessible in Java 9+ ab4e83a is described below commit ab4e83aca7cecc6cce0f0c51f3674481b381ba34 Author: Sean Owen AuthorDate: Sat Feb 23 11:01:47 2019 -0600 [SPARK-26963][MLLIB] SizeEstimator can't make some JDK fields accessible in Java 9+ ## What changes were proposed in this pull request? Don't use inaccessible fields in SizeEstimator, which comes up in Java 9+ ## How was this patch tested? Manually ran tests with Java 11; it causes these tests that failed before to pass. This ought to pass on Java 8 as there's effectively no change for Java 8. Closes #23866 from srowen/SPARK-26963. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/util/SizeEstimator.scala | 16 ++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index e12b6b7..4837b01 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -334,9 +334,21 @@ object SizeEstimator extends Logging { if (fieldClass.isPrimitive) { sizeCount(primitiveSize(fieldClass)) += 1 } else { - field.setAccessible(true) // Enable future get()'s on this field + // Note: in Java 9+ this would be better with trySetAccessible and canAccess + try { +field.setAccessible(true) // Enable future get()'s on this field +pointerFields = field :: pointerFields + } catch { +// If the field isn't accessible, we can still record the pointer size +// but can't know more about the field, so ignore it +case _: SecurityException => + // do nothing +// Java 9+ can throw InaccessibleObjectException but the class is Java 9+-only +case re: RuntimeException +if re.getClass.getSimpleName == "InaccessibleObjectException" => + // do nothing + } sizeCount(pointerSize) += 1 - pointerFields = field :: pointerFields } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26908][SQL] Fix DateTimeUtils.toMillis and millisToDays
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 75c48ac [SPARK-26908][SQL] Fix DateTimeUtils.toMillis and millisToDays 75c48ac is described below commit 75c48ac36d3c75f405b7ad8fd989ff4ee00eccbb Author: Maxim Gekk AuthorDate: Sat Feb 23 11:35:11 2019 -0600 [SPARK-26908][SQL] Fix DateTimeUtils.toMillis and millisToDays ## What changes were proposed in this pull request? The `DateTimeUtils.toMillis` can produce inaccurate result for some negative values (timestamps before epoch). The error can be around 1ms. In the PR, I propose to use `Math.floorDiv` in casting microseconds to milliseconds, and milliseconds to days since epoch. ## How was this patch tested? Added new test to `DateTimeUtilsSuite`, and tested by `CastSuite` as well. Closes #23815 from MaxGekk/micros-to-millis. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: Sean Owen --- .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala | 6 -- .../org/apache/spark/sql/catalyst/util/DateTimeUtils.scala | 10 +- .../apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala| 6 ++ 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index a6926d8..b20249f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -396,7 +396,9 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String // converting seconds to us private[this] def longToTimestamp(t: Long): Long = t * 100L // converting us to seconds - private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 100L).toLong + private[this] def timestampToLong(ts: Long): Long = { +Math.floorDiv(ts, DateTimeUtils.MICROS_PER_SECOND) + } // converting us to seconds in double private[this] def timestampToDouble(ts: Long): Double = { ts / 100.0 @@ -1072,7 +1074,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l * 100L" private[this] def timestampToIntegerCode(ts: ExprValue): Block = -code"java.lang.Math.floor((double) $ts / 100L)" +code"java.lang.Math.floorDiv($ts, 100L)" private[this] def timestampToDoubleCode(ts: ExprValue): Block = code"$ts / 100.0" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5a432ba..d714d29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -77,10 +77,10 @@ object DateTimeUtils { } def millisToDays(millisUtc: Long, timeZone: TimeZone): SQLDate = { -// SPARK-6785: use Math.floor so negative number of days (dates before 1970) +// SPARK-6785: use Math.floorDiv so negative number of days (dates before 1970) // will correctly work as input for function toJavaDate(Int) val millisLocal = millisUtc + timeZone.getOffset(millisUtc) -Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt +Math.floorDiv(millisLocal, MILLIS_PER_DAY).toInt } // reverse of millisToDays @@ -179,14 +179,14 @@ object DateTimeUtils { // When the timestamp is negative i.e before 1970, we need to adjust the millseconds portion. // Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision. // In millis precision the above needs to be represented as (-157700927877). -Math.floor(us.toDouble / MILLIS_PER_SECOND).toLong +Math.floorDiv(us, MICROS_PER_MILLIS) } /* - * Converts millseconds since epoch to SQLTimestamp. + * Converts milliseconds since epoch to SQLTimestamp. */ def fromMillis(millis: Long): SQLTimestamp = { -millis * 1000L +millis * MICROS_PER_MILLIS } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index b71790e..e270b91 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -515,6 +515,7 @@ class DateTimeUtilsSuite extends SparkFunSuite { val input =
[spark] branch master updated: [SPARK-26908][SQL] Fix DateTimeUtils.toMillis and millisToDays
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 75c48ac [SPARK-26908][SQL] Fix DateTimeUtils.toMillis and millisToDays 75c48ac is described below commit 75c48ac36d3c75f405b7ad8fd989ff4ee00eccbb Author: Maxim Gekk AuthorDate: Sat Feb 23 11:35:11 2019 -0600 [SPARK-26908][SQL] Fix DateTimeUtils.toMillis and millisToDays ## What changes were proposed in this pull request? The `DateTimeUtils.toMillis` can produce inaccurate result for some negative values (timestamps before epoch). The error can be around 1ms. In the PR, I propose to use `Math.floorDiv` in casting microseconds to milliseconds, and milliseconds to days since epoch. ## How was this patch tested? Added new test to `DateTimeUtilsSuite`, and tested by `CastSuite` as well. Closes #23815 from MaxGekk/micros-to-millis. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: Sean Owen --- .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala | 6 -- .../org/apache/spark/sql/catalyst/util/DateTimeUtils.scala | 10 +- .../apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala| 6 ++ 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index a6926d8..b20249f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -396,7 +396,9 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String // converting seconds to us private[this] def longToTimestamp(t: Long): Long = t * 100L // converting us to seconds - private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 100L).toLong + private[this] def timestampToLong(ts: Long): Long = { +Math.floorDiv(ts, DateTimeUtils.MICROS_PER_SECOND) + } // converting us to seconds in double private[this] def timestampToDouble(ts: Long): Double = { ts / 100.0 @@ -1072,7 +1074,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l * 100L" private[this] def timestampToIntegerCode(ts: ExprValue): Block = -code"java.lang.Math.floor((double) $ts / 100L)" +code"java.lang.Math.floorDiv($ts, 100L)" private[this] def timestampToDoubleCode(ts: ExprValue): Block = code"$ts / 100.0" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5a432ba..d714d29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -77,10 +77,10 @@ object DateTimeUtils { } def millisToDays(millisUtc: Long, timeZone: TimeZone): SQLDate = { -// SPARK-6785: use Math.floor so negative number of days (dates before 1970) +// SPARK-6785: use Math.floorDiv so negative number of days (dates before 1970) // will correctly work as input for function toJavaDate(Int) val millisLocal = millisUtc + timeZone.getOffset(millisUtc) -Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt +Math.floorDiv(millisLocal, MILLIS_PER_DAY).toInt } // reverse of millisToDays @@ -179,14 +179,14 @@ object DateTimeUtils { // When the timestamp is negative i.e before 1970, we need to adjust the millseconds portion. // Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision. // In millis precision the above needs to be represented as (-157700927877). -Math.floor(us.toDouble / MILLIS_PER_SECOND).toLong +Math.floorDiv(us, MICROS_PER_MILLIS) } /* - * Converts millseconds since epoch to SQLTimestamp. + * Converts milliseconds since epoch to SQLTimestamp. */ def fromMillis(millis: Long): SQLTimestamp = { -millis * 1000L +millis * MICROS_PER_MILLIS } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index b71790e..e270b91 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -515,6 +515,7 @@ class DateTimeUtilsSuite extends SparkFunSuite { val input =
[spark] branch master updated: [SPARK-26903][SQL] Remove the TimeZone cache
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d0f2fd0 [SPARK-26903][SQL] Remove the TimeZone cache d0f2fd0 is described below commit d0f2fd05e1079d580ffd7d9d263f662e92849175 Author: Maxim Gekk AuthorDate: Sat Feb 23 09:44:22 2019 -0600 [SPARK-26903][SQL] Remove the TimeZone cache ## What changes were proposed in this pull request? In the PR, I propose to convert time zone string to `TimeZone` by converting it to `ZoneId` which uses `ZoneOffset` internally. The `ZoneOffset` class of JDK 8 has a cache already: http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/java/time/ZoneOffset.java#l205 . In this way, there is no need to support cache of time zones in Spark. The PR removes `computedTimeZones` from `DateTimeUtils`, and uses `ZoneId.of` to convert time zone id string to `ZoneId` and to `TimeZone` at the end. ## How was this patch tested? The changes were tested by Closes #23812 from MaxGekk/timezone-cache. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: Sean Owen --- docs/sql-migration-guide-upgrade.md| 2 + .../spark/sql/catalyst/util/DateTimeUtils.scala| 11 +- .../expressions/DateExpressionsSuite.scala | 25 +- sql/core/benchmarks/DateTimeBenchmark-results.txt | 312 ++--- 4 files changed, 181 insertions(+), 169 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 1ae26e6..c201056 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -97,6 +97,8 @@ displayTitle: Spark SQL Upgrading Guide - the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone. + - In Spark version 2.4 and earlier, invalid time zone ids are silently ignored and replaced by GMT time zone, for example, in the from_utc_timestamp function. Since Spark 3.0, such time zone ids are rejected, and Spark throws `java.time.DateTimeException`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 28fb6a9..5a432ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -22,8 +22,7 @@ import java.time._ import java.time.Year.isLeap import java.time.temporal.IsoFields import java.util.{Locale, TimeZone} -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} -import java.util.function.{Function => JFunction} +import java.util.concurrent.TimeUnit import scala.util.control.NonFatal @@ -67,13 +66,9 @@ object DateTimeUtils { def defaultTimeZone(): TimeZone = TimeZone.getDefault() - private val computedTimeZones = new ConcurrentHashMap[String, TimeZone] - private val computeTimeZone = new JFunction[String, TimeZone] { -override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId) - } - def getTimeZone(timeZoneId: String): TimeZone = { -computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) +val zoneId = ZoneId.of(timeZoneId, ZoneId.SHORT_IDS) +TimeZone.getTimeZone(zoneId) } // we should use the exact day as Int, for example, (year, month, day) -> day diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index c3c29e3..ce576ec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -819,9 +819,17 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
[spark] branch master updated: [SPARK-26939][CORE][DOC] Fix some outdated comments about task schedulers
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ce3a157 [SPARK-26939][CORE][DOC] Fix some outdated comments about task schedulers ce3a157 is described below commit ce3a157f00e052ca4dc0c6d4ef282cb21588837e Author: seancxmao AuthorDate: Sat Feb 23 10:52:53 2019 -0600 [SPARK-26939][CORE][DOC] Fix some outdated comments about task schedulers ## What changes were proposed in this pull request? This PR aims to fix some outdated comments about task schedulers. 1. Change "ClusterScheduler" to "YarnScheduler" in comments of `YarnClusterScheduler` According to [SPARK-1140 Remove references to ClusterScheduler](https://issues.apache.org/jira/browse/SPARK-1140), ClusterScheduler is not used anymore. I also searched "ClusterScheduler" within the whole project, no other occurrences are found in comments or test cases. Note classes like `YarnClusterSchedulerBackend` or `MesosClusterScheduler` are not relevant. 2. Update comments about `statusUpdate` from `TaskSetManager` `statusUpdate` has been moved to `TaskSchedulerImpl`. StatusUpdate event handling is delegated to `handleSuccessfulTask`/`handleFailedTask`. ## How was this patch tested? N/A. Fix comments only. Closes #23844 from seancxmao/taskscheduler-comments. Authored-by: seancxmao Signed-off-by: Sean Owen --- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala| 3 ++- .../org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b7bf069..453939a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -38,7 +38,8 @@ import org.apache.spark.util.collection.MedianHeap * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, - * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished). + * and handleSuccessfulTask/handleFailedTask, which tells it that one of its tasks changed state + * (e.g. finished/failed). * * THREADING: This class is designed to only be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 96c9151..1f622a0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,7 +21,7 @@ import org.apache.spark._ import org.apache.spark.deploy.yarn.ApplicationMaster /** - * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of + * This is a simple extension to YarnScheduler - to ensure that appropriate initialization of * ApplicationMaster, etc is done */ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26939][CORE][DOC] Fix some outdated comments about task schedulers
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ce3a157 [SPARK-26939][CORE][DOC] Fix some outdated comments about task schedulers ce3a157 is described below commit ce3a157f00e052ca4dc0c6d4ef282cb21588837e Author: seancxmao AuthorDate: Sat Feb 23 10:52:53 2019 -0600 [SPARK-26939][CORE][DOC] Fix some outdated comments about task schedulers ## What changes were proposed in this pull request? This PR aims to fix some outdated comments about task schedulers. 1. Change "ClusterScheduler" to "YarnScheduler" in comments of `YarnClusterScheduler` According to [SPARK-1140 Remove references to ClusterScheduler](https://issues.apache.org/jira/browse/SPARK-1140), ClusterScheduler is not used anymore. I also searched "ClusterScheduler" within the whole project, no other occurrences are found in comments or test cases. Note classes like `YarnClusterSchedulerBackend` or `MesosClusterScheduler` are not relevant. 2. Update comments about `statusUpdate` from `TaskSetManager` `statusUpdate` has been moved to `TaskSchedulerImpl`. StatusUpdate event handling is delegated to `handleSuccessfulTask`/`handleFailedTask`. ## How was this patch tested? N/A. Fix comments only. Closes #23844 from seancxmao/taskscheduler-comments. Authored-by: seancxmao Signed-off-by: Sean Owen --- core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala| 3 ++- .../org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b7bf069..453939a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -38,7 +38,8 @@ import org.apache.spark.util.collection.MedianHeap * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, - * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished). + * and handleSuccessfulTask/handleFailedTask, which tells it that one of its tasks changed state + * (e.g. finished/failed). * * THREADING: This class is designed to only be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 96c9151..1f622a0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,7 +21,7 @@ import org.apache.spark._ import org.apache.spark.deploy.yarn.ApplicationMaster /** - * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of + * This is a simple extension to YarnScheduler - to ensure that appropriate initialization of * ApplicationMaster, etc is done */ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26903][SQL] Remove the TimeZone cache
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d0f2fd0 [SPARK-26903][SQL] Remove the TimeZone cache d0f2fd0 is described below commit d0f2fd05e1079d580ffd7d9d263f662e92849175 Author: Maxim Gekk AuthorDate: Sat Feb 23 09:44:22 2019 -0600 [SPARK-26903][SQL] Remove the TimeZone cache ## What changes were proposed in this pull request? In the PR, I propose to convert time zone string to `TimeZone` by converting it to `ZoneId` which uses `ZoneOffset` internally. The `ZoneOffset` class of JDK 8 has a cache already: http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/java/time/ZoneOffset.java#l205 . In this way, there is no need to support cache of time zones in Spark. The PR removes `computedTimeZones` from `DateTimeUtils`, and uses `ZoneId.of` to convert time zone id string to `ZoneId` and to `TimeZone` at the end. ## How was this patch tested? The changes were tested by Closes #23812 from MaxGekk/timezone-cache. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: Sean Owen --- docs/sql-migration-guide-upgrade.md| 2 + .../spark/sql/catalyst/util/DateTimeUtils.scala| 11 +- .../expressions/DateExpressionsSuite.scala | 25 +- sql/core/benchmarks/DateTimeBenchmark-results.txt | 312 ++--- 4 files changed, 181 insertions(+), 169 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 1ae26e6..c201056 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -97,6 +97,8 @@ displayTitle: Spark SQL Upgrading Guide - the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone. + - In Spark version 2.4 and earlier, invalid time zone ids are silently ignored and replaced by GMT time zone, for example, in the from_utc_timestamp function. Since Spark 3.0, such time zone ids are rejected, and Spark throws `java.time.DateTimeException`. + ## Upgrading From Spark SQL 2.3 to 2.4 - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 28fb6a9..5a432ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -22,8 +22,7 @@ import java.time._ import java.time.Year.isLeap import java.time.temporal.IsoFields import java.util.{Locale, TimeZone} -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} -import java.util.function.{Function => JFunction} +import java.util.concurrent.TimeUnit import scala.util.control.NonFatal @@ -67,13 +66,9 @@ object DateTimeUtils { def defaultTimeZone(): TimeZone = TimeZone.getDefault() - private val computedTimeZones = new ConcurrentHashMap[String, TimeZone] - private val computeTimeZone = new JFunction[String, TimeZone] { -override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId) - } - def getTimeZone(timeZoneId: String): TimeZone = { -computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) +val zoneId = ZoneId.of(timeZoneId, ZoneId.SHORT_IDS) +TimeZone.getTimeZone(zoneId) } // we should use the exact day as Int, for example, (year, month, day) -> day diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index c3c29e3..ce576ec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -819,9 +819,17 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {