[GitHub] [spark] LuciferYang commented on a diff in pull request #38737: [SPARK-41174][CORE][SQL] Propagate an error class to users for invalid `format` of `to_binary()`
LuciferYang commented on code in PR #38737: URL: https://github.com/apache/spark/pull/38737#discussion_r1030121284 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala: ## @@ -2620,46 +2620,81 @@ case class ToBinary( nullOnInvalidFormat: Boolean = false) extends RuntimeReplaceable with ImplicitCastInputTypes { - override lazy val replacement: Expression = format.map { f => -assert(f.foldable && (f.dataType == StringType || f.dataType == NullType)) + @transient lazy val fmt: String = format.map { f => val value = f.eval() if (value == null) { - Literal(null, BinaryType) + null } else { - value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) match { -case "hex" => Unhex(expr, failOnError = true) -case "utf-8" | "utf8" => Encode(expr, Literal("UTF-8")) -case "base64" => UnBase64(expr, failOnError = true) -case _ if nullOnInvalidFormat => Literal(null, BinaryType) -case other => throw QueryCompilationErrors.invalidStringLiteralParameter( - "to_binary", - "format", - other, - Some( -"The value has to be a case-insensitive string literal of " + -"'hex', 'utf-8', 'utf8', or 'base64'.")) - } + value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) +} + }.getOrElse("hex") + + override lazy val replacement: Expression = if (fmt == null) { +Literal(null, BinaryType) + } else { +fmt match { + case "hex" => Unhex(expr, failOnError = true) + case "utf-8" | "utf8" => Encode(expr, Literal("UTF-8")) + case "base64" => UnBase64(expr, failOnError = true) + case _ => Literal(null, BinaryType) } - }.getOrElse(Unhex(expr, failOnError = true)) + } def this(expr: Expression) = this(expr, None, false) def this(expr: Expression, format: Expression) = -this(expr, Some({ - // We perform this check in the constructor to make it eager and not go through type coercion. - if (format.foldable && (format.dataType == StringType || format.dataType == NullType)) { -format - } else { -throw QueryCompilationErrors.requireLiteralParameter("to_binary", "format", "string") - } -}), false) +this(expr, Some(format), false) override def prettyName: String = "to_binary" override def children: Seq[Expression] = expr +: format.toSeq override def inputTypes: Seq[AbstractDataType] = children.map(_ => StringType) + override def checkInputDataTypes(): TypeCheckResult = { +def isValidFormat: Boolean = { + fmt == null || Set("hex", "utf-8", "utf8", "base64").contains(fmt) +} +format match { + case Some(f) => +if (f.foldable && (f.dataType == StringType || f.dataType == NullType)) { + if (isValidFormat || nullOnInvalidFormat) { +super.checkInputDataTypes() + } else { +DataTypeMismatch( + errorSubClass = "INVALID_ARG_VALUE", + messageParameters = Map( +"inputName" -> "fmt", +"requireType" -> s"case-insensitive ${toSQLType(StringType)}", +"validValues" -> "'hex', 'utf-8', 'utf8', or 'base64'", +"inputValue" -> toSQLValue(fmt, StringType) + ) +) + } +} else if (!f.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", Review Comment: [36e1bda](https://github.com/apache/spark/pull/38737/commits/36e1bda1427ae754e739c70d363e4733615bd7bd) add two new case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dengziming commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation
dengziming commented on PR #38659: URL: https://github.com/apache/spark/pull/38659#issuecomment-1324659424 Thank you @grundprinzip for your review, I fixed the comments and let's wait for @hvanhovell and @cloud-fan. 欄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #38764: [SPARK-41206][SQL][FOLLOWUP] Make result of `checkColumnNameDuplication` stable to fix `COLUMN_ALREADY_EXISTS` check failed with Scala 2.
LuciferYang commented on PR #38764: URL: https://github.com/apache/spark/pull/38764#issuecomment-1324658198 Thanks @HyukjinKwon @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #38737: [SPARK-41174][CORE][SQL] Propagate an error class to users for invalid `format` of `to_binary()`
MaxGekk commented on code in PR #38737: URL: https://github.com/apache/spark/pull/38737#discussion_r1030091988 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala: ## @@ -2620,46 +2620,81 @@ case class ToBinary( nullOnInvalidFormat: Boolean = false) extends RuntimeReplaceable with ImplicitCastInputTypes { - override lazy val replacement: Expression = format.map { f => -assert(f.foldable && (f.dataType == StringType || f.dataType == NullType)) + @transient lazy val fmt: String = format.map { f => val value = f.eval() if (value == null) { - Literal(null, BinaryType) + null } else { - value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) match { -case "hex" => Unhex(expr, failOnError = true) -case "utf-8" | "utf8" => Encode(expr, Literal("UTF-8")) -case "base64" => UnBase64(expr, failOnError = true) -case _ if nullOnInvalidFormat => Literal(null, BinaryType) -case other => throw QueryCompilationErrors.invalidStringLiteralParameter( - "to_binary", - "format", - other, - Some( -"The value has to be a case-insensitive string literal of " + -"'hex', 'utf-8', 'utf8', or 'base64'.")) - } + value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) +} + }.getOrElse("hex") + + override lazy val replacement: Expression = if (fmt == null) { +Literal(null, BinaryType) + } else { +fmt match { + case "hex" => Unhex(expr, failOnError = true) + case "utf-8" | "utf8" => Encode(expr, Literal("UTF-8")) + case "base64" => UnBase64(expr, failOnError = true) + case _ => Literal(null, BinaryType) } - }.getOrElse(Unhex(expr, failOnError = true)) + } def this(expr: Expression) = this(expr, None, false) def this(expr: Expression, format: Expression) = -this(expr, Some({ - // We perform this check in the constructor to make it eager and not go through type coercion. - if (format.foldable && (format.dataType == StringType || format.dataType == NullType)) { -format - } else { -throw QueryCompilationErrors.requireLiteralParameter("to_binary", "format", "string") - } -}), false) +this(expr, Some(format), false) override def prettyName: String = "to_binary" override def children: Seq[Expression] = expr +: format.toSeq override def inputTypes: Seq[AbstractDataType] = children.map(_ => StringType) + override def checkInputDataTypes(): TypeCheckResult = { +def isValidFormat: Boolean = { + fmt == null || Set("hex", "utf-8", "utf8", "base64").contains(fmt) +} +format match { + case Some(f) => +if (f.foldable && (f.dataType == StringType || f.dataType == NullType)) { + if (isValidFormat || nullOnInvalidFormat) { +super.checkInputDataTypes() + } else { +DataTypeMismatch( + errorSubClass = "INVALID_ARG_VALUE", + messageParameters = Map( +"inputName" -> "fmt", +"requireType" -> s"case-insensitive ${toSQLType(StringType)}", +"validValues" -> "'hex', 'utf-8', 'utf8', or 'base64'", +"inputValue" -> toSQLValue(fmt, StringType) + ) +) + } +} else if (!f.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", Review Comment: Yep. Just to be sure we handle the non-foldable case correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on pull request #38766: [MINOR][SQL] Fix error message for `UNEXPECTED_INPUT_TYPE`
itholic commented on PR #38766: URL: https://github.com/apache/spark/pull/38766#issuecomment-1324635151 I made it into a separate PR from my other tasks although it's very small change, to avoid in case this error message affecting the tests on the original PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic opened a new pull request, #38769: [SPARK-41228][SQL] Rename & Improve error message for `COLUMN_NOT_IN_GROUP_BY_CLAUSE`.
itholic opened a new pull request, #38769: URL: https://github.com/apache/spark/pull/38769 ### What changes were proposed in this pull request? This PR proposes to rename `COLUMN_NOT_IN_GROUP_BY_CLAUSE` to `MISSING_AGGREGATION`. Also, improve its error message. ### Why are the changes needed? The current error class name and its error message doesn't illustrate the error cause and resolution correctly. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` ./build/sbt “sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*” ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia opened a new pull request, #38768: [SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate expression type
amaliujia opened a new pull request, #38768: URL: https://github.com/apache/spark/pull/38768 ### What changes were proposed in this pull request? This PR proposes that Relations (e.g. Aggregate in this PR) should only deal with `Expression` than `str`. `str` could be mapped to different expressions (e.g. sql expression, unresolved_attribute, etc.). Relations are not supposed to understand the difference of `str` but DataFrame should understand it. This PR specifically changes for `Aggregate`. ### Why are the changes needed? Codebase refactoring. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #38707: [SPARK-41176][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1042
MaxGekk commented on code in PR #38707: URL: https://github.com/apache/spark/pull/38707#discussion_r1030084249 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala: ## @@ -637,13 +637,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def invalidFunctionArgumentsError( - name: String, expectedInfo: String, actualNumber: Int): Throwable = { + name: String, expectedNum: String, actualNum: Int): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1042", + errorClass = "FUNCTION_WRONG_NUM_ARGS", messageParameters = Map( -"name" -> name, -"expectedInfo" -> expectedInfo, -"actualNumber" -> actualNumber.toString)) +"functionName" -> name, Review Comment: Please, wrap `functionName` by `toSQLId()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #38767: [SPARK-41183][SQL][FOLLOWUP] Fix a typo
cloud-fan commented on PR #38767: URL: https://github.com/apache/spark/pull/38767#issuecomment-1324625972 cc @viirya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan opened a new pull request, #38767: [SPARK-41183][SQL][FOLLOWUP] Fix a typo
cloud-fan opened a new pull request, #38767: URL: https://github.com/apache/spark/pull/38767 ### What changes were proposed in this pull request? Followup of https://github.com/apache/spark/pull/38692. To follow other APIs in `SparkSessionExtensions`, the name should be `inject...Rule` and `build...Rules`. ### Why are the changes needed? typo fix ### Does this PR introduce _any_ user-facing change? not a released API ### How was this patch tested? n/a -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic opened a new pull request, #38766: [MINOR][SQL] Fix error message for `UNEXPECTED_INPUT_TYPE`
itholic opened a new pull request, #38766: URL: https://github.com/apache/spark/pull/38766 ### What changes were proposed in this pull request? This PR proposes to correct the minor syntax on error message for `UNEXPECTED_INPUT_TYPE`, ### Why are the changes needed? Error message should be started with upper-case character, and clear to read. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` ./build/sbt “sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*” ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38737: [SPARK-41174][CORE][SQL] Propagate an error class to users for invalid `format` of `to_binary()`
LuciferYang commented on code in PR #38737: URL: https://github.com/apache/spark/pull/38737#discussion_r1030082083 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala: ## @@ -2620,46 +2620,81 @@ case class ToBinary( nullOnInvalidFormat: Boolean = false) extends RuntimeReplaceable with ImplicitCastInputTypes { - override lazy val replacement: Expression = format.map { f => -assert(f.foldable && (f.dataType == StringType || f.dataType == NullType)) + @transient lazy val fmt: String = format.map { f => val value = f.eval() if (value == null) { - Literal(null, BinaryType) + null } else { - value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) match { -case "hex" => Unhex(expr, failOnError = true) -case "utf-8" | "utf8" => Encode(expr, Literal("UTF-8")) -case "base64" => UnBase64(expr, failOnError = true) -case _ if nullOnInvalidFormat => Literal(null, BinaryType) -case other => throw QueryCompilationErrors.invalidStringLiteralParameter( - "to_binary", - "format", - other, - Some( -"The value has to be a case-insensitive string literal of " + -"'hex', 'utf-8', 'utf8', or 'base64'.")) - } + value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) +} + }.getOrElse("hex") + + override lazy val replacement: Expression = if (fmt == null) { +Literal(null, BinaryType) + } else { +fmt match { + case "hex" => Unhex(expr, failOnError = true) + case "utf-8" | "utf8" => Encode(expr, Literal("UTF-8")) + case "base64" => UnBase64(expr, failOnError = true) + case _ => Literal(null, BinaryType) } - }.getOrElse(Unhex(expr, failOnError = true)) + } def this(expr: Expression) = this(expr, None, false) def this(expr: Expression, format: Expression) = -this(expr, Some({ - // We perform this check in the constructor to make it eager and not go through type coercion. - if (format.foldable && (format.dataType == StringType || format.dataType == NullType)) { -format - } else { -throw QueryCompilationErrors.requireLiteralParameter("to_binary", "format", "string") - } -}), false) +this(expr, Some(format), false) override def prettyName: String = "to_binary" override def children: Seq[Expression] = expr +: format.toSeq override def inputTypes: Seq[AbstractDataType] = children.map(_ => StringType) + override def checkInputDataTypes(): TypeCheckResult = { +def isValidFormat: Boolean = { + fmt == null || Set("hex", "utf-8", "utf8", "base64").contains(fmt) +} +format match { + case Some(f) => +if (f.foldable && (f.dataType == StringType || f.dataType == NullType)) { + if (isValidFormat || nullOnInvalidFormat) { +super.checkInputDataTypes() + } else { +DataTypeMismatch( + errorSubClass = "INVALID_ARG_VALUE", + messageParameters = Map( +"inputName" -> "fmt", +"requireType" -> s"case-insensitive ${toSQLType(StringType)}", +"validValues" -> "'hex', 'utf-8', 'utf8', or 'base64'", +"inputValue" -> toSQLValue(fmt, StringType) + ) +) + } +} else if (!f.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", Review Comment: For `!f.foldable` branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wankunde opened a new pull request, #38765: [SPARK-35531][SQL][FOLLOWUP] Support alter table command with CASE_SENSITIVE is true
wankunde opened a new pull request, #38765: URL: https://github.com/apache/spark/pull/38765 ### What changes were proposed in this pull request? Restore dbName and tableName in `HiveShim.getTable()` method. When we create a hive table, hive will convert the dbName and tableName in lower case: https://github.com/apache/hive/blob/release-2.3.9-rc0/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1446-L1482 And then throw an exception in `Hive. alterTable()`: https://github.com/apache/hive/blob/release-2.3.9-rc0/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L623 ### Why are the changes needed? Bug fix for alter table command. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #38710: [SPARK-41179][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1092
MaxGekk commented on PR #38710: URL: https://github.com/apache/spark/pull/38710#issuecomment-1324619657 @panbingkun Please, resolve conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #38760: [SPARK-41219][SQL] Decimal changePrecision should work with decimal(0, 0)
cloud-fan commented on PR #38760: URL: https://github.com/apache/spark/pull/38760#issuecomment-1324619462 cc @srielau @viirya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #38725: [SPARK-41182][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1102
MaxGekk commented on code in PR #38725: URL: https://github.com/apache/spark/pull/38725#discussion_r1030079292 ## core/src/main/resources/error/error-classes.json: ## @@ -656,6 +656,11 @@ ], "sqlState" : "42000" }, + "INVALID_EXTRACT_FIELD" : { +"message" : [ + "Literals of the type are currently not supported for the type." Review Comment: Could you make the error message more clear and precise. Let's talk specifically about the invalid extract field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #38730: [SPARK-41181][SQL] Migrate the map options errors onto error classes
MaxGekk commented on PR #38730: URL: https://github.com/apache/spark/pull/38730#issuecomment-1324618149 @panbingkun Could you resolve conflicts, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #38760: [SPARK-41219][SQL] Decimal changePrecision should work with decimal(0, 0)
cloud-fan commented on PR #38760: URL: https://github.com/apache/spark/pull/38760#issuecomment-1324618007 It seems reasonable to say that 0 is the only valid value for `decimal(0, 0)`. Forbidding `decimal(0, 0)` seems also reasonable but is more risky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on pull request #35799: [SPARK-38498][STREAM] Support customized StreamingListener by configuration
AngersZh commented on PR #35799: URL: https://github.com/apache/spark/pull/35799#issuecomment-1324617674 gentle ping @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #38737: [SPARK-41174][CORE][SQL] Propagate an error class to users for invalid `format` of `to_binary()`
MaxGekk commented on code in PR #38737: URL: https://github.com/apache/spark/pull/38737#discussion_r1030077573 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala: ## @@ -2620,46 +2620,81 @@ case class ToBinary( nullOnInvalidFormat: Boolean = false) extends RuntimeReplaceable with ImplicitCastInputTypes { - override lazy val replacement: Expression = format.map { f => -assert(f.foldable && (f.dataType == StringType || f.dataType == NullType)) + @transient lazy val fmt: String = format.map { f => val value = f.eval() if (value == null) { - Literal(null, BinaryType) + null } else { - value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) match { -case "hex" => Unhex(expr, failOnError = true) -case "utf-8" | "utf8" => Encode(expr, Literal("UTF-8")) -case "base64" => UnBase64(expr, failOnError = true) -case _ if nullOnInvalidFormat => Literal(null, BinaryType) -case other => throw QueryCompilationErrors.invalidStringLiteralParameter( - "to_binary", - "format", - other, - Some( -"The value has to be a case-insensitive string literal of " + -"'hex', 'utf-8', 'utf8', or 'base64'.")) - } + value.asInstanceOf[UTF8String].toString.toLowerCase(Locale.ROOT) +} + }.getOrElse("hex") + + override lazy val replacement: Expression = if (fmt == null) { +Literal(null, BinaryType) + } else { +fmt match { + case "hex" => Unhex(expr, failOnError = true) + case "utf-8" | "utf8" => Encode(expr, Literal("UTF-8")) + case "base64" => UnBase64(expr, failOnError = true) + case _ => Literal(null, BinaryType) } - }.getOrElse(Unhex(expr, failOnError = true)) + } def this(expr: Expression) = this(expr, None, false) def this(expr: Expression, format: Expression) = -this(expr, Some({ - // We perform this check in the constructor to make it eager and not go through type coercion. - if (format.foldable && (format.dataType == StringType || format.dataType == NullType)) { -format - } else { -throw QueryCompilationErrors.requireLiteralParameter("to_binary", "format", "string") - } -}), false) +this(expr, Some(format), false) override def prettyName: String = "to_binary" override def children: Seq[Expression] = expr +: format.toSeq override def inputTypes: Seq[AbstractDataType] = children.map(_ => StringType) + override def checkInputDataTypes(): TypeCheckResult = { +def isValidFormat: Boolean = { + fmt == null || Set("hex", "utf-8", "utf8", "base64").contains(fmt) +} +format match { + case Some(f) => +if (f.foldable && (f.dataType == StringType || f.dataType == NullType)) { + if (isValidFormat || nullOnInvalidFormat) { +super.checkInputDataTypes() + } else { +DataTypeMismatch( + errorSubClass = "INVALID_ARG_VALUE", + messageParameters = Map( +"inputName" -> "fmt", +"requireType" -> s"case-insensitive ${toSQLType(StringType)}", +"validValues" -> "'hex', 'utf-8', 'utf8', or 'base64'", +"inputValue" -> toSQLValue(fmt, StringType) + ) +) + } +} else if (!f.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", Review Comment: Could you write a test for the case, please. I just wonder how it could happen: ```scala if (f.foldable && ...) { ... } else if (!f.foldable) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #38764: [SPARK-41206][SQL][FOLLOWUP] Make result of `checkColumnNameDuplication` stable to fix `COLUMN_ALREADY_EXISTS` check failed with Scala 2.13
MaxGekk closed pull request #38764: [SPARK-41206][SQL][FOLLOWUP] Make result of `checkColumnNameDuplication` stable to fix `COLUMN_ALREADY_EXISTS` check failed with Scala 2.13 URL: https://github.com/apache/spark/pull/38764 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #38764: [SPARK-41206][SQL][FOLLOWUP] Make result of `checkColumnNameDuplication` stable to fix `COLUMN_ALREADY_EXISTS` check failed with Scala 2.13
MaxGekk commented on PR #38764: URL: https://github.com/apache/spark/pull/38764#issuecomment-1324599102 +1, LGTM. Merging to master. All GAs passed. Thank you, @LuciferYang and @HyukjinKwon for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #38750: [SPARK-41226][SQL] Refactor Spark types by introducing physical types
AmplabJenkins commented on PR #38750: URL: https://github.com/apache/spark/pull/38750#issuecomment-1324584432 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #38751: [SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
AmplabJenkins commented on PR #38751: URL: https://github.com/apache/spark/pull/38751#issuecomment-1324584414 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #38576: [SPARK-41062][SQL] Rename `UNSUPPORTED_CORRELATED_REFERENCE` to `CORRELATED_REFERENCE`
MaxGekk commented on code in PR #38576: URL: https://github.com/apache/spark/pull/38576#discussion_r1030039032 ## sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala: ## @@ -964,17 +964,14 @@ class SubquerySuite extends QueryTest | WHERE t1.c1 = t2.c1) """.stripMargin) } - checkErrorMatchPVals( + checkError( exception1, -errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_CORRELATED_REFERENCE", -parameters = Map("treeNode" -> "(?s).*"), -sqlState = None, +errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE", +parameters = Map("sqlExprs" -> "outer(arr_c2#427264)"), Review Comment: the same question here. Can we guarantee the same number across multiple runs. ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala: ## @@ -51,11 +51,13 @@ class ResolveSubquerySuite extends AnalysisTest { test("SPARK-17251 Improve `OuterReference` to be `NamedExpression`") { val expr = Filter( InSubquery(Seq(a), ListQuery(Project(Seq(UnresolvedAttribute("a")), t2))), t1) -val m = intercept[AnalysisException] { - SimpleAnalyzer.checkAnalysis(SimpleAnalyzer.ResolveSubquery(expr)) -}.getMessage -assert(m.contains( - "Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses")) +checkError( + exception = intercept[AnalysisException] { +SimpleAnalyzer.checkAnalysis(SimpleAnalyzer.ResolveSubquery(expr)) + }, + errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE", + parameters = Map("sqlExprs" -> "outer(a#4151672)") Review Comment: Is the number `4151672` stable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #38575: [SPARK-40948][SQL][FOLLOWUP] Restore PATH_NOT_FOUND
MaxGekk closed pull request #38575: [SPARK-40948][SQL][FOLLOWUP] Restore PATH_NOT_FOUND URL: https://github.com/apache/spark/pull/38575 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #38575: [SPARK-40948][SQL][FOLLOWUP] Restore PATH_NOT_FOUND
MaxGekk commented on PR #38575: URL: https://github.com/apache/spark/pull/38575#issuecomment-1324575522 +1, LGTM. Merging to master. Thank you, @itholic and @HyukjinKwon @cloud-fan @LuciferYang for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #25004: [SPARK-28205][SQL] useV1SourceList configuration should be for all data sources
MaxGekk commented on code in PR #25004: URL: https://github.com/apache/spark/pull/25004#discussion_r1030034252 ## sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala: ## @@ -170,4 +174,46 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSQLContext { } } } + + test("Fallback Parquet V2 to V1") { Review Comment: This test is flaky, it seems. Failed in my PR w/ unrelated changes: ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: ArrayBuffer(("collect", Relation [id#725095L] parquet ), ("command", InsertIntoHadoopFsRelationCommand file:/home/runner/work/spark/spark/target/tmp/spark-0d0623b6-2cdd-4ed0-bd86-0116ec152a01, false, Parquet, [path=/home/runner/work/spark/spark/target/tmp/spark-0d0623b6-2cdd-4ed0-bd86-0116ec152a01], ErrorIfExists, [id] +- Range (0, 10, step=1, splits=Some(2)) )) had length 2 instead of expected length 1 at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) at org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.$anonfun$new$22(FileDataSourceV2FallBackSuite.scala:179) at org.apache.spark.sql.connector.FileDataSourceV2FallBackSuite.$anonfun$new$22$adapted(FileDataSourceV2FallBackSuite.scala:175) ``` see https://github.com/MaxGekk/spark/runs/9650795751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38757: [SPARK-41222][CONNECT][PYTHON] Unify the typing definitions
zhengruifeng commented on code in PR #38757: URL: https://github.com/apache/spark/pull/38757#discussion_r1030022747 ## python/pyspark/sql/connect/column.py: ## @@ -15,14 +15,15 @@ # limitations under the License. # import uuid -from typing import cast, get_args, TYPE_CHECKING, Callable, Any +from typing import cast, TYPE_CHECKING, Callable, Any import json import decimal import datetime import pyspark.sql.connect.proto as proto -from pyspark.sql.connect._typing import PrimitiveType + +primitive_types = (bool, float, int, str) Review Comment: thanks, let me update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38757: [SPARK-41222][CONNECT][PYTHON] Unify the typing definitions
HyukjinKwon commented on code in PR #38757: URL: https://github.com/apache/spark/pull/38757#discussion_r1030022065 ## python/pyspark/sql/connect/column.py: ## @@ -15,14 +15,15 @@ # limitations under the License. # import uuid -from typing import cast, get_args, TYPE_CHECKING, Callable, Any +from typing import cast, TYPE_CHECKING, Callable, Any import json import decimal import datetime import pyspark.sql.connect.proto as proto -from pyspark.sql.connect._typing import PrimitiveType + +primitive_types = (bool, float, int, str) Review Comment: You can import this in the function in that case. e.g.) ```diff def _bin_op( name: str, doc: str = "binary function", reverse: bool = False ) -> Callable[["Column", Any], "Expression"]: + from pyspark.sql.connect._typing import PrimitiveType def _(self: "Column", other: Any) -> "Expression": ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38757: [SPARK-41222][CONNECT][PYTHON] Unify the typing definitions
zhengruifeng commented on code in PR #38757: URL: https://github.com/apache/spark/pull/38757#discussion_r1030020312 ## python/pyspark/sql/connect/column.py: ## @@ -15,14 +15,15 @@ # limitations under the License. # import uuid -from typing import cast, get_args, TYPE_CHECKING, Callable, Any +from typing import cast, TYPE_CHECKING, Callable, Any import json import decimal import datetime import pyspark.sql.connect.proto as proto -from pyspark.sql.connect._typing import PrimitiveType + +primitive_types = (bool, float, int, str) Review Comment: list the types here to avoid a `circular import`, since `_typing` also import column -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38575: [SPARK-40948][SQL][FOLLOWUP] Restore PATH_NOT_FOUND
HyukjinKwon commented on code in PR #38575: URL: https://github.com/apache/spark/pull/38575#discussion_r1030018762 ## R/pkg/tests/fulltests/test_sparkSQL.R: ## @@ -3990,12 +3990,16 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume expect_error(read.df(source = "json"), paste("Error in load : analysis error - Unable to infer schema for JSON.", "It must be specified manually")) - expect_error(read.df("arbitrary_path"), "Error in load : analysis error - Path does not exist") - expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist") - expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist") - expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist") + expect_error(read.df("arbitrary_path"), + "Error in load : analysis error - \\[PATH_NOT_FOUND\\].*") Review Comment: thanks man -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #38760: [SPARK-41219][SQL] Decimal changePrecision should work with decimal(0, 0)
ulysses-you commented on code in PR #38760: URL: https://github.com/apache/spark/pull/38760#discussion_r1030015079 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala: ## @@ -3537,6 +3537,12 @@ class DataFrameSuite extends QueryTest }.isEmpty) } } + + test("SPARK-41219: Decimal changePrecision should work with decimal(0, 0)") { +val df = Seq("0.5944910").toDF("a") +checkAnswer(df.selectExpr("cast(a as decimal(7,7)) div 100"), Row(0)) Review Comment: @cloud-fan see this test, the decimal type of IntegralDivide can be `decimal(0, 0)` (other BinaryArithmetic does not have the issue). https://github.com/apache/spark/blob/d275a83c582a23d7730b5a35a335f9cd10c2741c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala#L864-L868 Or, we may change it to `max(p1 - s1 + s2, 1)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38762: [SPARK-41225] [CONNECT] [PYTHON] Disable unsupported functions.
amaliujia commented on code in PR #38762: URL: https://github.com/apache/spark/pull/38762#discussion_r1030011493 ## python/pyspark/sql/connect/dataframe.py: ## @@ -951,6 +951,39 @@ def createOrReplaceGlobalTempView(self, name: str) -> None: ).command(session=self._session) self._session.execute_command(command) +def rdd(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("RDD Support for Spark Connect is not implemented.") Review Comment: Maybe keep consistently to include `for Spark Connect` or not include it in the error message? Either is fine. Only this one mentions `Spark Connect` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38762: [SPARK-41225] [CONNECT] [PYTHON] Disable unsupported functions.
amaliujia commented on code in PR #38762: URL: https://github.com/apache/spark/pull/38762#discussion_r1030011493 ## python/pyspark/sql/connect/dataframe.py: ## @@ -951,6 +951,39 @@ def createOrReplaceGlobalTempView(self, name: str) -> None: ).command(session=self._session) self._session.execute_command(command) +def rdd(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("RDD Support for Spark Connect is not implemented.") Review Comment: Maybe keep consistently to include `for Spark Connect` or not include in the error message? Only this one mentions `Spark Connect` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation
amaliujia commented on code in PR #38659: URL: https://github.com/apache/spark/pull/38659#discussion_r1030009940 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -271,8 +273,12 @@ class SparkConnectPlanner(session: SparkSession) { } private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = { -val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq -new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes) +val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator( + Seq(rel.getData.toByteArray).iterator, Review Comment: if the information is already in ARROW IPC stream, +1 to remove `row count`. I don't also think `row count` was used properly in the initial implementation (e.g. probably was not used in the CSV version). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38685: [SPARK-41206][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1233` to `COLUMN_ALREADY_EXISTS`
LuciferYang commented on code in PR #38685: URL: https://github.com/apache/spark/pull/38685#discussion_r1030009517 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala: ## @@ -1759,24 +1763,25 @@ class DataFrameSuite extends QueryTest test("SPARK-8072: Better Exception for Duplicate Columns") { // only one duplicate column present -val e = intercept[org.apache.spark.sql.AnalysisException] { +val e = intercept[AnalysisException] { Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1") .write.format("parquet").save("temp") } -assert(e.getMessage.contains("Found duplicate column(s) when inserting into")) -assert(e.getMessage.contains("column1")) -assert(!e.getMessage.contains("column2")) +checkError( + exception = e, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`column1`")) Review Comment: https://github.com/apache/spark/pull/38764 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #38764: [SPARK-41206][SQL][FOLLOWUP] Make result of `checkColumnNameDuplication` stable to fix `COLUMN_ALREADY_EXISTS` check failed with Scala 2.
LuciferYang commented on PR #38764: URL: https://github.com/apache/spark/pull/38764#issuecomment-1324532345 cc @HyukjinKwon try to fix https://github.com/apache/spark/pull/38685#discussion_r1029966254 https://pipelines.actions.githubusercontent.com/serviceHosts/03398d36-4378-4d47-a936-fba0a5e8ccb9/_apis/pipelines/1/runs/197563/signedlogcontent/15?urlExpires=2022-11-23T02%3A27%3A31.4973482Z=HMACV1=9ZuCXVZm%2FAAfuPXmBpFS1qunTFAC%2B2kN7XpW%2BZ2MnXg%3D ``` 2022-11-22T20:55:45.2729890Z [0m[[0m[31merror[0m] [0m[0mFailed tests:[0m 2022-11-22T20:55:45.2730467Z [0m[[0m[31merror[0m] [0m[0m org.apache.spark.sql.DataFrameSuite[0m 2022-11-22T20:55:45.2731402Z [0m[[0m[31merror[0m] [0m[0m org.apache.spark.sql.execution.datasources.json.JsonV1Suite[0m 2022-11-22T20:55:45.2732142Z [0m[[0m[31merror[0m] [0m[0m org.apache.spark.sql.execution.datasources.json.JsonV2Suite[0m 2022-11-22T20:55:45.2732893Z [0m[[0m[31merror[0m] [0m[0m org.apache.spark.sql.execution.datasources.json.JsonLegacyTimeParserSuite[0m ``` Let us waiting CI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
ahshahid commented on code in PR #38714: URL: https://github.com/apache/spark/pull/38714#discussion_r1030005359 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala: ## @@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper { */ def getOuterReferences(expr: Expression): Seq[Expression] = { val outerExpressions = ArrayBuffer.empty[Expression] -def collectOutRefs(input: Expression): Unit = input match { + +def collectOutRefs(input: Expression): Boolean = input match { case a: AggregateExpression if containsOuter(a) => if (a.references.nonEmpty) { throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin) } else { // Collect and update the sub-tree so that outer references inside this aggregate // expression will not be collected. For example: min(outer(a)) -> min(a). - val newExpr = stripOuterReference(a) - outerExpressions += newExpr + // delay collecting outer expression as we want to go as much up as possible Review Comment: But will add some more tests to validate that whole arithmetic expression is replaced by outer ref & not just a subexpr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
ahshahid commented on code in PR #38714: URL: https://github.com/apache/spark/pull/38714#discussion_r1030005062 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala: ## @@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper { */ def getOuterReferences(expr: Expression): Seq[Expression] = { val outerExpressions = ArrayBuffer.empty[Expression] -def collectOutRefs(input: Expression): Unit = input match { + +def collectOutRefs(input: Expression): Boolean = input match { case a: AggregateExpression if containsOuter(a) => if (a.references.nonEmpty) { throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin) } else { // Collect and update the sub-tree so that outer references inside this aggregate // expression will not be collected. For example: min(outer(a)) -> min(a). - val newExpr = stripOuterReference(a) - outerExpressions += newExpr + // delay collecting outer expression as we want to go as much up as possible Review Comment: implemented.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
ahshahid commented on code in PR #38714: URL: https://github.com/apache/spark/pull/38714#discussion_r1030004911 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala: ## @@ -17,13 +17,20 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + +import org.junit.Assert + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{CreateArray, Expression, GetStructField, InSubquery, LateralSubquery, ListQuery, OuterReference, ScalarSubquery} -import org.apache.spark.sql.catalyst.expressions.aggregate.Count +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Cos, CreateArray, Exists, Expression, GetStructField, InSubquery, LateralSubquery, ListQuery, OuterReference, ScalarSubquery, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, Count} +import org.apache.spark.sql.catalyst.optimizer.{PropagateEmptyRelation, SimpleTestOptimizer} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf + Review Comment: done ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala: ## @@ -17,13 +17,20 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + +import org.junit.Assert Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #38760: [SPARK-41219][SQL] Decimal changePrecision should work with decimal(0, 0)
cloud-fan commented on code in PR #38760: URL: https://github.com/apache/spark/pull/38760#discussion_r1030003237 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala: ## @@ -3537,6 +3537,12 @@ class DataFrameSuite extends QueryTest }.isEmpty) } } + + test("SPARK-41219: Decimal changePrecision should work with decimal(0, 0)") { +val df = Seq("0.5944910").toDF("a") +checkAnswer(df.selectExpr("cast(a as decimal(7,7)) div 100"), Row(0)) +checkAnswer(df.select(lit(BigDecimal(0)) as "c").selectExpr("cast(c as decimal(0,0))"), Row(0)) Review Comment: hmmm, `decimal(0, 0)` is a valid decimal type? how do you use it in production? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wankunde commented on a diff in pull request #38560: [WIP][SPARK-38005][core] Support cleaning up merged shuffle files and state from external shuffle service
wankunde commented on code in PR #38560: URL: https://github.com/apache/spark/pull/38560#discussion_r1029992768 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -452,22 +489,69 @@ void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) { } /** - * Clean up all the AppShufflePartitionInfo and the finalized shuffle partitions in DB for - * a specific shuffleMergeId. This is done since there is a higher shuffleMergeId request made - * for a shuffleId, therefore clean up older shuffleMergeId partitions. The cleanup will be - * executed the mergedShuffleCleaner thread. + * Clean up the outdated finalized or unfinalized shuffle partitions. The cleanup will be executed in the + * mergedShuffleCleaner thread. Two cases need to clean up the shuffle: 1. there is a higher shuffleMergeId request + * made for a shuffleId, therefore clean up older shuffleMergeId partitions. 2. Application requires to clean up the + * expired or unused specific shuffleId partitions */ + @VisibleForTesting + void submitRemoveShuffleMergeTask( + AppShuffleInfo shuffleInfo, Integer shuffleId, + Optional higherShuffleMergeId) { +AppShuffleMergePartitionsInfo mergePartitionsInfo = shuffleInfo.shuffles.get(shuffleId); +AppAttemptShuffleMergeId shuffleMergeId = new AppAttemptShuffleMergeId( +shuffleInfo.appId, shuffleInfo.attemptId, shuffleId, mergePartitionsInfo.shuffleMergeId); +if (!mergePartitionsInfo.isFinalized()) { + Map partitionsToClean = mergePartitionsInfo.shuffleMergePartitions; + submitCleanupTask(() -> + closeAndDeleteOutdatedPartitions(shuffleMergeId, partitionsToClean, higherShuffleMergeId)); +} else { + int[] partitionsToClean = mergePartitionsInfo.finalizedPartitions; + submitCleanupTask(() -> + deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, partitionsToClean, higherShuffleMergeId)); +} + } + @VisibleForTesting void closeAndDeleteOutdatedPartitions( - AppAttemptShuffleMergeId appAttemptShuffleMergeId, - Map partitions) { -removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); + AppAttemptShuffleMergeId shuffleMergeId, + Map partitions, + Optional higherShuffleMergeId) { +removeAppShufflePartitionInfoFromDB(shuffleMergeId); +higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB); partitions - .forEach((partitionId, partitionInfo) -> { -synchronized (partitionInfo) { - partitionInfo.closeAllFilesAndDeleteIfNeeded(true); -} - }); +.forEach((partitionId, partitionInfo) -> { + synchronized (partitionInfo) { +partitionInfo.closeAllFilesAndDeleteIfNeeded(true); + } +}); + } + + @VisibleForTesting + void deleteOutdatedFinalizedPartitions( + AppShuffleInfo shuffleInfo, + AppAttemptShuffleMergeId shuffleMergeId, + int[] outdatedFinalizedPartitions, + Optional higherShuffleMergeId) { +int shuffleId = shuffleMergeId.shuffleId; +int mergeId = shuffleMergeId.shuffleMergeId; +removeAppShufflePartitionInfoFromDB(shuffleMergeId); +higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB); +Arrays.stream(outdatedFinalizedPartitions).forEach(partition -> { + try { +File metaFile = +shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId, partition); +File indexFile = new File( +shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId, partition)); +File dataFile = +shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId, partition); +metaFile.delete(); +indexFile.delete(); +dataFile.delete(); + } catch (Exception e) { +logger.error("Error delete shuffle files for {}", shuffleMergeId, e); + } Review Comment: Just like `closeAllFilesAndDeleteIfNeeded` method, can we continue delete the other files if one `delete()` failed? ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -712,6 +794,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); + appShuffleInfo.shuffles.get(msg.shuffleId).setFinalizedPartitions(Ints.toArray(reduceIds)); Review Comment: The `FinalizedPartitions` will be empty after the Shuffle service restart which will cause the merged shuffle files leak. ## core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala: ## @@ -327,43 +327,52 @@ class BlockManagerMasterEndpoint( } }.toSeq -// Find all shuffle blocks on executors that are no
[GitHub] [spark] LuciferYang opened a new pull request, #38764: [SPARK-41206][SQL][FOLLOWUP] Make result of `checkColumnNameDuplication` stable to fix `COLUMN_ALREADY_EXISTS` check with Scala 2.13
LuciferYang opened a new pull request, #38764: URL: https://github.com/apache/spark/pull/38764 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on a diff in pull request #38646: [SPARK-41131][SQL] Improve error message for `UNRESOLVED_MAP_KEY.WITHOUT_SUGGESTION`
itholic commented on code in PR #38646: URL: https://github.com/apache/spark/pull/38646#discussion_r102592 ## core/src/main/resources/error/error-classes.json: ## @@ -1044,7 +1044,7 @@ }, "UNRESOLVED_MAP_KEY" : { "message" : [ - "Cannot resolve column as a map key. If the key is a string literal, please add single quotes around it." + "Cannot resolve column as a map key. If the key is a string literal, add single quotes around it: ''." Review Comment: Sounds good. Just updated! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #38575: [SPARK-40948][SQL][FOLLOWUP] Restore PATH_NOT_FOUND
cloud-fan commented on code in PR #38575: URL: https://github.com/apache/spark/pull/38575#discussion_r1029997623 ## R/pkg/tests/fulltests/test_sparkSQL.R: ## @@ -3990,12 +3990,16 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume expect_error(read.df(source = "json"), paste("Error in load : analysis error - Unable to infer schema for JSON.", "It must be specified manually")) - expect_error(read.df("arbitrary_path"), "Error in load : analysis error - Path does not exist") - expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist") - expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist") - expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist") + expect_error(read.df("arbitrary_path"), + "Error in load : analysis error - \\[PATH_NOT_FOUND\\].*") Review Comment: LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #38763: [SPARK-41201][CONNECT][PYTHON][TEST][FOLLOWUP] Reenable test_fill_na
zhengruifeng commented on PR #38763: URL: https://github.com/apache/spark/pull/38763#issuecomment-1324508234 merged into master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #38763: [SPARK-41201][CONNECT][PYTHON][TEST][FOLLOWUP] Reenable test_fill_na
zhengruifeng closed pull request #38763: [SPARK-41201][CONNECT][PYTHON][TEST][FOLLOWUP] Reenable test_fill_na URL: https://github.com/apache/spark/pull/38763 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on a diff in pull request #38575: [SPARK-40948][SQL][FOLLOWUP] Restore PATH_NOT_FOUND
itholic commented on code in PR #38575: URL: https://github.com/apache/spark/pull/38575#discussion_r1029988770 ## R/pkg/tests/fulltests/test_sparkSQL.R: ## @@ -3990,12 +3990,16 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume expect_error(read.df(source = "json"), paste("Error in load : analysis error - Unable to infer schema for JSON.", "It must be specified manually")) - expect_error(read.df("arbitrary_path"), "Error in load : analysis error - Path does not exist") - expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist") - expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist") - expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist") + expect_error(read.df("arbitrary_path"), + "Error in load : analysis error - \\[PATH_NOT_FOUND\\].*") Review Comment: FYI: the `arbitrary_path` here is different per test environments, so I use regexp for the path string. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation
zhengruifeng commented on code in PR #38659: URL: https://github.com/apache/spark/pull/38659#discussion_r1029986139 ## connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -271,8 +273,12 @@ class SparkConnectPlanner(session: SparkSession) { } private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = { -val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq -new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes) +val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator( + Seq(rel.getData.toByteArray).iterator, Review Comment: I'm fine to remove the row count, it was supported in `collect` just because it was in the initial proto message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on pull request #38731: [SPARK-41209][PYTHON] Improve PySpark type inference in _merge_type method
xinrong-meng commented on PR #38731: URL: https://github.com/apache/spark/pull/38731#issuecomment-1324490281 Thanks @sadikovi ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38762: [SPARK-41225] [CONNECT] [PYTHON] Disable unsupported functions.
zhengruifeng commented on code in PR #38762: URL: https://github.com/apache/spark/pull/38762#discussion_r1029983662 ## python/pyspark/sql/connect/dataframe.py: ## @@ -951,6 +951,39 @@ def createOrReplaceGlobalTempView(self, name: str) -> None: ).command(session=self._session) self._session.execute_command(command) +def rdd(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("RDD Support for Spark Connect is not implemented.") + +def unpersist(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("unpersist() is not implemented.") + +def cache(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("cache() is not implemented.") + +def persist(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("persist() is not implemented.") + +def withWatermark(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("withWatermark() is not implemented.") + +def observe(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("observe() is not implemented.") + +def foreach(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("foreach() is not implemented.") + +def foreachPartition(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("foreachPartition() is not implemented.") + +def toLocalIterator(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("toLocalIterator() is not implemented.") + +def checkpoint(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("checkpoint() is not implemented.") + +def localCheckpoint(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("localCheckpoint() is not implemented.") + Review Comment: + `toJSON` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xiuzhu9527 commented on pull request #38674: [SPARK-41160][YARN] Fix error when submitting a task to the yarn that enabled the timeline service
xiuzhu9527 commented on PR #38674: URL: https://github.com/apache/spark/pull/38674#issuecomment-1324484471 @tgravescs 1. Yes, Jersey 1 and Jersey 2 are two different packages, one is com.sun.jersey and one is org.glassfish.jersey 2. I will try to use maven-shade-plugin to change the package path of javax.ws.rs in Jersey 1, this will resolve issues( https://issues.apache.org/jira/browse/SPARK-11081 )the Jersey1 problem mentioned in -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yabola commented on a diff in pull request #38560: [WIP][SPARK-38005][core] Support cleaning up merged shuffle files and state from external shuffle service
yabola commented on code in PR #38560: URL: https://github.com/apache/spark/pull/38560#discussion_r1023816561 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -654,8 +731,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + deleteCurrentShufflePartitions(appShuffleInfo, shuffleId, appAttemptShuffleMergeId)); Review Comment: ~~I checked the `appAttemptShuffleMergeId` in the code before. I think if we want to delete partitions merged data, then we should delete the corresponding ShuffleMergeId in DB (Otherwise, inconsistency will occur when restoring shuffle info from db)~~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #38685: [SPARK-41206][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1233` to `COLUMN_ALREADY_EXISTS`
LuciferYang commented on code in PR #38685: URL: https://github.com/apache/spark/pull/38685#discussion_r1029979036 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala: ## @@ -1759,24 +1763,25 @@ class DataFrameSuite extends QueryTest test("SPARK-8072: Better Exception for Duplicate Columns") { // only one duplicate column present -val e = intercept[org.apache.spark.sql.AnalysisException] { +val e = intercept[AnalysisException] { Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1") .write.format("parquet").save("temp") } -assert(e.getMessage.contains("Found duplicate column(s) when inserting into")) -assert(e.getMessage.contains("column1")) -assert(!e.getMessage.contains("column2")) +checkError( + exception = e, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`column1`")) Review Comment: Let me check this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 closed pull request #38205: [SPARK-40747][CORE] Support setting driver log url using env vars on other resource managers
pan3793 closed pull request #38205: [SPARK-40747][CORE] Support setting driver log url using env vars on other resource managers URL: https://github.com/apache/spark/pull/38205 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on pull request #38205: [SPARK-40747][CORE] Support setting driver log url using env vars on other resource managers
pan3793 commented on PR #38205: URL: https://github.com/apache/spark/pull/38205#issuecomment-1324476298 Close and in favor https://github.com/apache/spark/pull/38357 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #38763: [SPARK-41201][CONNECT][PYTHON][TEST][FOLLOWUP] Reenable test_fill_na
amaliujia commented on PR #38763: URL: https://github.com/apache/spark/pull/38763#issuecomment-1324471937 @zhengruifeng thanks for the clarification! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/
zhengruifeng commented on code in PR #38742: URL: https://github.com/apache/spark/pull/38742#discussion_r1029975064 ## connector/connect/src/main/protobuf/spark/connect/base.proto: ## @@ -100,18 +70,138 @@ message AnalyzePlanRequest { // logging purposes and will not be interpreted by the server. optional string client_type = 4; - // (Optional) Get the explain string of the plan. - Explain explain = 5; + repeated AnalysisTask tasks = 5; + + message AnalysisTask { +oneof task { + // Get the schema + Schema schema = 1; + + // Is local + IsLocal is_local = 2; + + // Is Streaming + IsStreaming is_streaming = 3; + + // Get the explain string of the plan. + Explain explain = 4; + + // Get the tree string of the schema. + TreeString tree_string = 5; Review Comment: we also ask the server to provide the string for `df.show` and `df.explain`, maybe simpler to also do this for `printSchema` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38762: [SPARK-41225] [CONNECT] [PYTHON] Disable unsupported functions.
zhengruifeng commented on code in PR #38762: URL: https://github.com/apache/spark/pull/38762#discussion_r1029974156 ## python/pyspark/sql/connect/dataframe.py: ## @@ -951,6 +951,39 @@ def createOrReplaceGlobalTempView(self, name: str) -> None: ).command(session=self._session) self._session.execute_command(command) +def rdd(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("RDD Support for Spark Connect is not implemented.") + +def unpersist(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("unpersist() is not implemented.") + +def cache(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("cache() is not implemented.") + +def persist(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("persist() is not implemented.") + +def withWatermark(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("withWatermark() is not implemented.") + +def observe(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("observe() is not implemented.") + +def foreach(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("foreach() is not implemented.") + +def foreachPartition(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("foreachPartition() is not implemented.") + +def toLocalIterator(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("toLocalIterator() is not implemented.") + +def checkpoint(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("checkpoint() is not implemented.") + +def localCheckpoint(self, *args: Any, **kwargs: Any) -> None: +raise NotImplementedError("localCheckpoint() is not implemented.") + Review Comment: I think we also do not support `to_pandas_on_spark`, `pandas_api`, `registerTempTable `, `storageLevel`, `mapInPandas`, `mapInArrow `, `writeStream` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #38745: [SPARK-37099][SQL] Optimize the filter based on rank-like window function by reduce not required rows
beliefer commented on PR #38745: URL: https://github.com/apache/spark/pull/38745#issuecomment-1324464202 ping @zhengruifeng cc @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #38763: [SPARK-41201][CONNECT][PYTHON][TEST][FOLLOWUP] Reenable test_fill_na
zhengruifeng commented on PR #38763: URL: https://github.com/apache/spark/pull/38763#issuecomment-1324459180 @amaliujia that is on purpose, `sdf.x` will just throw an exception since `sdf` don't contains `x` column, but in connect df `cdf` , `cdf.x` will not throw an exception since it will not check the schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38685: [SPARK-41206][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1233` to `COLUMN_ALREADY_EXISTS`
HyukjinKwon commented on code in PR #38685: URL: https://github.com/apache/spark/pull/38685#discussion_r1029966254 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala: ## @@ -1759,24 +1763,25 @@ class DataFrameSuite extends QueryTest test("SPARK-8072: Better Exception for Duplicate Columns") { // only one duplicate column present -val e = intercept[org.apache.spark.sql.AnalysisException] { +val e = intercept[AnalysisException] { Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1") .write.format("parquet").save("temp") } -assert(e.getMessage.contains("Found duplicate column(s) when inserting into")) -assert(e.getMessage.contains("column1")) -assert(!e.getMessage.contains("column2")) +checkError( + exception = e, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`column1`")) Review Comment: Several tests fixed here seem flaky because of the order in the map, e.g.: ``` - SPARK-8072: Better Exception for Duplicate Columns *** FAILED *** (42 milliseconds) Map("columnName" -> "`column3`") did not equal Map("columnName" -> "`column1`") (SparkFunSuite.scala:317) Analysis: JavaCollectionWrappers$JMapWrapper(columnName: `column3` -> `column1`) org.scalatest.exceptions.TestFailedException: at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) at org.apache.spark.SparkFunSuite.checkError(SparkFunSuite.scala:317) at org.apache.spark.sql.DataFrameSuite.$anonfun$new$368(DataFrameSuite.scala:1781) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:207) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) ``` https://github.com/apache/spark/actions/runs/3525051044/jobs/5911287739 and https://github.com/apache/spark/actions/runs/3526328003 which happens in a different JDK or Scala version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38685: [SPARK-41206][SQL] Rename the error class `_LEGACY_ERROR_TEMP_1233` to `COLUMN_ALREADY_EXISTS`
HyukjinKwon commented on code in PR #38685: URL: https://github.com/apache/spark/pull/38685#discussion_r1029966254 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala: ## @@ -1759,24 +1763,25 @@ class DataFrameSuite extends QueryTest test("SPARK-8072: Better Exception for Duplicate Columns") { // only one duplicate column present -val e = intercept[org.apache.spark.sql.AnalysisException] { +val e = intercept[AnalysisException] { Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1") .write.format("parquet").save("temp") } -assert(e.getMessage.contains("Found duplicate column(s) when inserting into")) -assert(e.getMessage.contains("column1")) -assert(!e.getMessage.contains("column2")) +checkError( + exception = e, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`column1`")) Review Comment: Several tests fixed here seem flaky because of the order in the map, e.g.: ``` 2022-11-22T20:16:18.0679384Z [0m[[0m[0minfo[0m] [0m[0m[31m- SPARK-8072: Better Exception for Duplicate Columns *** FAILED *** (42 milliseconds)[0m[0m 2022-11-22T20:16:18.0680329Z [0m[[0m[0minfo[0m] [0m[0m[31m Map("columnName" -> "`column3`") did not equal Map("columnName" -> "`column1`") (SparkFunSuite.scala:317)[0m[0m 2022-11-22T20:16:18.0680867Z [0m[[0m[0minfo[0m] [0m[0m[31m Analysis:[0m[0m 2022-11-22T20:16:18.0681437Z [0m[[0m[0minfo[0m] [0m[0m[31m JavaCollectionWrappers$JMapWrapper(columnName: `column3` -> `column1`)[0m[0m 2022-11-22T20:16:18.0682045Z [0m[[0m[0minfo[0m] [0m[0m[31m org.scalatest.exceptions.TestFailedException:[0m[0m 2022-11-22T20:16:18.0682715Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)[0m[0m 2022-11-22T20:16:18.0683422Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)[0m[0m 2022-11-22T20:16:18.0684101Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)[0m[0m 2022-11-22T20:16:18.0684760Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)[0m[0m 2022-11-22T20:16:18.0685415Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.apache.spark.SparkFunSuite.checkError(SparkFunSuite.scala:317)[0m[0m 2022-11-22T20:16:18.0686092Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.apache.spark.sql.DataFrameSuite.$anonfun$new$368(DataFrameSuite.scala:1781)[0m[0m 2022-11-22T20:16:18.0686748Z [0m[[0m[0minfo[0m] [0m[0m[31m at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)[0m[0m 2022-11-22T20:16:18.0687352Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)[0m[0m 2022-11-22T20:16:18.0687936Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)[0m[0m 2022-11-22T20:16:18.0688514Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)[0m[0m 2022-11-22T20:16:18.0689084Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.Transformer.apply(Transformer.scala:22)[0m[0m 2022-11-22T20:16:18.0689662Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.Transformer.apply(Transformer.scala:20)[0m[0m 2022-11-22T20:16:18.0690294Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)[0m[0m 2022-11-22T20:16:18.0690957Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:207)[0m[0m 2022-11-22T20:16:18.0691669Z [0m[[0m[0minfo[0m] [0m[0m[31m at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)[0m[0m ``` https://github.com/apache/spark/actions/runs/3525051044/jobs/5911287739 and https://github.com/apache/spark/actions/runs/3526328003 which happens in a different JDK or Scala version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets commented on pull request #38727: [SPARK-41205][SQL] Check that format is foldable in `TryToBinary`
bersprockets commented on PR #38727: URL: https://github.com/apache/spark/pull/38727#issuecomment-1324453979 Tested PR https://github.com/apache/spark/pull/38737. That PR incidentally seems to fix this issue: ``` SELECT try_to_binary(col1, col2) from values ('abc', 'utf-8') as data(col1, col2); [DATATYPE_MISMATCH.NON_FOLDABLE_INPUT] Cannot resolve "to_binary(col1, col2)" due to data type mismatch: the input fmt should be a foldable "STRING" expression; however, got "col2".; line 1 pos 7; 'Project [unresolvedalias(try_to_binary(col1#0, col2#1), None)] +- SubqueryAlias data +- LocalRelation [col1#0, col2#1] spark-sql> ``` Therefore, closing this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bersprockets closed pull request #38727: [SPARK-41205][SQL] Check that format is foldable in `TryToBinary`
bersprockets closed pull request #38727: [SPARK-41205][SQL] Check that format is foldable in `TryToBinary` URL: https://github.com/apache/spark/pull/38727 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on pull request #38760: [SPARK-41219][SQL] Decimal changePrecision should work with decimal(0, 0)
ulysses-you commented on PR #38760: URL: https://github.com/apache/spark/pull/38760#issuecomment-1324450892 cc @cloud-fan @revans2 @gengliangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #38760: [SPARK-41219][SQL] Decimal changePrecision should work with decimal(0, 0)
ulysses-you commented on code in PR #38760: URL: https://github.com/apache/spark/pull/38760#discussion_r1029962313 ## sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala: ## @@ -384,4 +384,11 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester with SQLHelper } } } + + test("SPARK-41219: Decimal changePrecision should work with decimal(0, 0)") { +assert(Decimal(0).changePrecision(0, 0)) +assert(Decimal(0L).changePrecision(0, 0)) +assert(Decimal(java.math.BigInteger.valueOf(0)).changePrecision(0, 0)) +assert(Decimal(BigDecimal(0)).changePrecision(0, 0)) Review Comment: this is the key test, before it returned false -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #38739: [SPARK-41207][SQL] Fix BinaryArithmetic with negative scale
ulysses-you commented on code in PR #38739: URL: https://github.com/apache/spark/pull/38739#discussion_r1029960652 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala: ## @@ -276,9 +276,9 @@ class DecimalPrecisionSuite extends AnalysisTest with BeforeAndAfter { val a = AttributeReference("a", DecimalType(3, -10))() val b = AttributeReference("b", DecimalType(1, -1))() val c = AttributeReference("c", DecimalType(35, 1))() - checkType(Multiply(a, b), DecimalType(5, -11)) - checkType(Multiply(a, c), DecimalType(38, -9)) - checkType(Multiply(b, c), DecimalType(37, 0)) + checkType(Multiply(a, b), DecimalType(16, 0)) Review Comment: In this pr, I changed all BinaryArithmetic with negative scale. These test changes you see are actually the legacy issues since we support negative scale, not related https://github.com/apache/spark/pull/36698. I really can not understand why `decimal(38, -9)` can appear in Spark, it means overflow. The regression of `IntegralDivide` is caused by https://github.com/apache/spark/pull/36698. We can move code to `IntegralDivide` if we want to narrow the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38686: [SPARK-41169][CONNECT][PYTHON] Implement `DataFrame.drop`
amaliujia commented on code in PR #38686: URL: https://github.com/apache/spark/pull/38686#discussion_r1029946709 ## python/pyspark/sql/connect/dataframe.py: ## @@ -255,10 +255,21 @@ def distinct(self) -> "DataFrame": ) def drop(self, *cols: "ColumnOrString") -> "DataFrame": Review Comment: I think we have built the consensus that we prefer re-using the proto than ask clients do duplicate work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #38686: [SPARK-41169][CONNECT][PYTHON] Implement `DataFrame.drop`
amaliujia commented on PR #38686: URL: https://github.com/apache/spark/pull/38686#issuecomment-1324436991 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/
zhengruifeng commented on code in PR #38742: URL: https://github.com/apache/spark/pull/38742#discussion_r1029953393 ## connector/connect/src/main/protobuf/spark/connect/base.proto: ## @@ -100,18 +70,138 @@ message AnalyzePlanRequest { // logging purposes and will not be interpreted by the server. optional string client_type = 4; - // (Optional) Get the explain string of the plan. - Explain explain = 5; + repeated AnalysisTask tasks = 5; + + message AnalysisTask { +oneof task { + // Get the schema + Schema schema = 1; + + // Is local + IsLocal is_local = 2; + + // Is Streaming + IsStreaming is_streaming = 3; + + // Get the explain string of the plan. + Explain explain = 4; + + // Get the tree string of the schema. + TreeString tree_string = 5; + + // Get the input files. + InputFiles input_files = 6; + + // Get the semantic hash + SemanticHash semantic_hash = 7; + + // Check whether plans are equal. + SameSemantics same_semantics = 8; +} + } + + // Analyze the input plan and return the schema. + message Schema { } + + // Returns true if the `collect` and `take` methods can be run locally. + message IsLocal { } + + // Returns true if this Dataset contains one or more sources that continuously + // return data as it arrives. + message IsStreaming { } + + // Explains the input plan based on a configurable mode. + message Explain { +// Plan explanation mode. +enum ExplainMode { + MODE_UNSPECIFIED = 0; + + // Generates only physical plan. + SIMPLE = 1; + + // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan. + // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans + // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects. + // The optimized logical plan transforms through a set of optimization rules, resulting in the + // physical plan. + EXTENDED = 2; + + // Generates code for the statement, if any and a physical plan. + CODEGEN = 3; + + // If plan node statistics are available, generates a logical plan and also the statistics. + COST = 4; + + // Generates a physical plan outline and also node details. + FORMATTED = 5; +} + +// (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings. +ExplainMode explain_mode= 1; + } + + // Generate a string to express the schema in a nice tree format. + // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema') + // to compute the results. + message TreeString { + +// (Optional) The level to generate the string. +optional int32 level = 1; + } + + // Returns a best-effort snapshot of the files that compose this Dataset. + // It will invoke 'Dataset.inputFiles' to compute the results. + message InputFiles { } + + // Returns a `hashCode` of the logical query plan. + // It will invoke 'Dataset.semanticHash' to compute the results. + message SemanticHash { } + + // Returns `true` when the logical query plans inside both Datasets are equal. + // It will invoke 'Dataset.sameSemantics' to compute the results. + message SameSemantics { +Relation other = 1; + } } // Response to performing analysis of the query. Contains relevant metadata to be able to // reason about the performance. message AnalyzePlanResponse { string client_id = 1; - DataType schema = 2; - // The extended explain string as produced by Spark. - string explain_string = 3; + repeated AnalysisResult results = 2; + + message AnalysisResult { +oneof result { + // The analyzed schema. + DataType schema = 1; + + // Is local + bool is_local = 2; + + // Is Streaming + bool is_streaming = 3; + + // The extended explain string as produced by Spark. + string explain_string = 4; + + // Get the tree string of the schema. + string tree_string = 5; + + // Get the input files. + InputFiles input_files = 6; + + // Get the semantic hash code. + int32 semantic_hash = 7; Review Comment: printSchema is frequently used, but I also add others by the way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/
zhengruifeng commented on code in PR #38742: URL: https://github.com/apache/spark/pull/38742#discussion_r1029953075 ## connector/connect/src/main/protobuf/spark/connect/base.proto: ## @@ -100,18 +70,138 @@ message AnalyzePlanRequest { // logging purposes and will not be interpreted by the server. optional string client_type = 4; - // (Optional) Get the explain string of the plan. - Explain explain = 5; + repeated AnalysisTask tasks = 5; + + message AnalysisTask { +oneof task { + // Get the schema + Schema schema = 1; + + // Is local + IsLocal is_local = 2; + + // Is Streaming + IsStreaming is_streaming = 3; + + // Get the explain string of the plan. + Explain explain = 4; + + // Get the tree string of the schema. + TreeString tree_string = 5; + + // Get the input files. + InputFiles input_files = 6; + + // Get the semantic hash + SemanticHash semantic_hash = 7; + + // Check whether plans are equal. + SameSemantics same_semantics = 8; +} + } + + // Analyze the input plan and return the schema. + message Schema { } + + // Returns true if the `collect` and `take` methods can be run locally. + message IsLocal { } + + // Returns true if this Dataset contains one or more sources that continuously + // return data as it arrives. + message IsStreaming { } + + // Explains the input plan based on a configurable mode. + message Explain { +// Plan explanation mode. +enum ExplainMode { + MODE_UNSPECIFIED = 0; + + // Generates only physical plan. + SIMPLE = 1; + + // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan. + // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans + // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects. + // The optimized logical plan transforms through a set of optimization rules, resulting in the + // physical plan. + EXTENDED = 2; + + // Generates code for the statement, if any and a physical plan. + CODEGEN = 3; + + // If plan node statistics are available, generates a logical plan and also the statistics. + COST = 4; + + // Generates a physical plan outline and also node details. + FORMATTED = 5; +} + +// (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings. +ExplainMode explain_mode= 1; + } + + // Generate a string to express the schema in a nice tree format. + // It will invoke 'StructType.treeString' (same as 'Dataset.printSchema') + // to compute the results. + message TreeString { + +// (Optional) The level to generate the string. +optional int32 level = 1; + } + + // Returns a best-effort snapshot of the files that compose this Dataset. + // It will invoke 'Dataset.inputFiles' to compute the results. + message InputFiles { } + + // Returns a `hashCode` of the logical query plan. + // It will invoke 'Dataset.semanticHash' to compute the results. + message SemanticHash { } + + // Returns `true` when the logical query plans inside both Datasets are equal. + // It will invoke 'Dataset.sameSemantics' to compute the results. + message SameSemantics { +Relation other = 1; + } } // Response to performing analysis of the query. Contains relevant metadata to be able to // reason about the performance. message AnalyzePlanResponse { string client_id = 1; - DataType schema = 2; - // The extended explain string as produced by Spark. - string explain_string = 3; + repeated AnalysisResult results = 2; + + message AnalysisResult { +oneof result { + // The analyzed schema. + DataType schema = 1; + + // Is local + bool is_local = 2; + + // Is Streaming + bool is_streaming = 3; + + // The extended explain string as produced by Spark. + string explain_string = 4; + + // Get the tree string of the schema. + string tree_string = 5; + + // Get the input files. + InputFiles input_files = 6; + + // Get the semantic hash code. + int32 semantic_hash = 7; Review Comment: the methods added here are all public API, and used by the users -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #38763: [SPARK-41201][CONNECT][PYTHON][TEST][FOLLOWUP] Reenable test_fill_na
amaliujia commented on PR #38763: URL: https://github.com/apache/spark/pull/38763#issuecomment-1324432409 LGTM If you are interested in, can you BTW follow up in this PR on https://github.com/apache/spark/blob/2c8da56d6f3136eb7b97f9c06d131c767ce459b2/python/pyspark/sql/tests/connect/test_connect_basic.py#L212 Basically I think ``` self.assert_eq( cdf.drop(cdf.a, cdf.x).toPandas(), sdf.drop("a", "x").toPandas(), ) ``` these two case are not equivalent? They should be ``` Basically I think ``` self.assert_eq( cdf.drop(cdf.a, cdf.x).toPandas(), sdf.drop(cdf.a, cdf.x).toPandas(), ) ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/
zhengruifeng commented on code in PR #38742: URL: https://github.com/apache/spark/pull/38742#discussion_r1029952200 ## connector/connect/src/main/protobuf/spark/connect/base.proto: ## @@ -100,18 +70,138 @@ message AnalyzePlanRequest { // logging purposes and will not be interpreted by the server. optional string client_type = 4; - // (Optional) Get the explain string of the plan. - Explain explain = 5; + repeated AnalysisTask tasks = 5; + + message AnalysisTask { +oneof task { + // Get the schema + Schema schema = 1; + + // Is local + IsLocal is_local = 2; + + // Is Streaming + IsStreaming is_streaming = 3; + + // Get the explain string of the plan. + Explain explain = 4; Review Comment: the message for `Explain` was not changed, just moved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
zhengruifeng commented on code in PR #38723: URL: https://github.com/apache/spark/pull/38723#discussion_r1029951842 ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -302,6 +301,31 @@ def test_to_pandas(self): self.spark.sql(query).toPandas(), ) +def test_select_expr(self): +# SPARK-41201: test selectExpr API. +self.assert_eq( +self.connect.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +self.spark.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +) +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +) + +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +) + +@unittest.skip("test_fill_na is flaky") Review Comment: no worries, add it back in https://github.com/apache/spark/pull/38763 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #38763: [SPARK-41201][CONNECT][PYTHON][TEST][FOLLOWUP] Reenable test_fill_na
zhengruifeng opened a new pull request, #38763: URL: https://github.com/apache/spark/pull/38763 ### What changes were proposed in this pull request? Reenable test_fill_na ### Why are the changes needed? `test_fill_na` was disabled by mistake in https://github.com/apache/spark/pull/38723 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? reenabled test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
amaliujia commented on code in PR #38723: URL: https://github.com/apache/spark/pull/38723#discussion_r1029951479 ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -302,6 +301,31 @@ def test_to_pandas(self): self.spark.sql(query).toPandas(), ) +def test_select_expr(self): +# SPARK-41201: test selectExpr API. +self.assert_eq( +self.connect.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +self.spark.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +) +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +) + +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +) + +@unittest.skip("test_fill_na is flaky") Review Comment: I am really guessing if I have more than 1 commit locally, if the first one I resolve the conflict, the following commit that might add something back silently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
amaliujia commented on code in PR #38723: URL: https://github.com/apache/spark/pull/38723#discussion_r1029951172 ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -302,6 +301,31 @@ def test_to_pandas(self): self.spark.sql(query).toPandas(), ) +def test_select_expr(self): +# SPARK-41201: test selectExpr API. +self.assert_eq( +self.connect.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +self.spark.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +) +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +) + +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +) + +@unittest.skip("test_fill_na is flaky") Review Comment: I am pretty sure I removed this after conflict resolution. Actually Martin pointed out another case: https://github.com/apache/spark/pull/38723#discussion_r1028942189 Basically it seems happened more than once that after code conflict resolution, the code I want to keep is gone.| Maybe I should always do a `-i` commits square to in case more than 1 commit rebase causing unexpected result. ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -302,6 +301,31 @@ def test_to_pandas(self): self.spark.sql(query).toPandas(), ) +def test_select_expr(self): +# SPARK-41201: test selectExpr API. +self.assert_eq( +self.connect.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +self.spark.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +) +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +) + +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +) + +@unittest.skip("test_fill_na is flaky") Review Comment: I will follow up this soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
zhengruifeng commented on code in PR #38723: URL: https://github.com/apache/spark/pull/38723#discussion_r1029950542 ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -302,6 +301,31 @@ def test_to_pandas(self): self.spark.sql(query).toPandas(), ) +def test_select_expr(self): +# SPARK-41201: test selectExpr API. +self.assert_eq( +self.connect.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +self.spark.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +) +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +) + +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +) + +@unittest.skip("test_fill_na is flaky") Review Comment: I think so, will send a followup for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
HyukjinKwon commented on code in PR #38723: URL: https://github.com/apache/spark/pull/38723#discussion_r1029950210 ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -302,6 +301,31 @@ def test_to_pandas(self): self.spark.sql(query).toPandas(), ) +def test_select_expr(self): +# SPARK-41201: test selectExpr API. +self.assert_eq( +self.connect.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +self.spark.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +) +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +) + +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +) + +@unittest.skip("test_fill_na is flaky") Review Comment: ah .. I didn't notice this. Can we enable this back? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38742: [SPARK-41216][CONNECT][PYTHON] Make AnalyzePlan support multiple analysis tasks And implement isLocal/isStreaming/printSchema/
zhengruifeng commented on code in PR #38742: URL: https://github.com/apache/spark/pull/38742#discussion_r1029949868 ## connector/connect/src/main/protobuf/spark/connect/base.proto: ## @@ -100,18 +70,138 @@ message AnalyzePlanRequest { // logging purposes and will not be interpreted by the server. optional string client_type = 4; - // (Optional) Get the explain string of the plan. - Explain explain = 5; + repeated AnalysisTask tasks = 5; + + message AnalysisTask { +oneof task { + // Get the schema + Schema schema = 1; + + // Is local + IsLocal is_local = 2; + + // Is Streaming + IsStreaming is_streaming = 3; + + // Get the explain string of the plan. + Explain explain = 4; + + // Get the tree string of the schema. + TreeString tree_string = 5; + + // Get the input files. + InputFiles input_files = 6; + + // Get the semantic hash + SemanticHash semantic_hash = 7; + + // Check whether plans are equal. Review Comment: one e2e test was added for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
zhengruifeng commented on code in PR #38723: URL: https://github.com/apache/spark/pull/38723#discussion_r1029949413 ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -302,6 +301,31 @@ def test_to_pandas(self): self.spark.sql(query).toPandas(), ) +def test_select_expr(self): +# SPARK-41201: test selectExpr API. +self.assert_eq( +self.connect.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +self.spark.read.table(self.tbl_name).selectExpr("id * 2").toPandas(), +) +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr(["id * 2", "cast(name as long) as name"]) +.toPandas(), +) + +self.assert_eq( +self.connect.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +self.spark.read.table(self.tbl_name) +.selectExpr("id * 2", "cast(name as long) as name") +.toPandas(), +) + +@unittest.skip("test_fill_na is flaky") Review Comment: @amaliujia why you disable this test again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #38751: [SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size
HyukjinKwon commented on PR #38751: URL: https://github.com/apache/spark/pull/38751#issuecomment-1324425695 Seems like the test failure looks unrelated. I don't mind merging it as is. Feel free to retrigger https://github.com/gaoyajun02/spark/runs/9633118279 @gaoyajun02 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
HyukjinKwon closed pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client URL: https://github.com/apache/spark/pull/38723 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
HyukjinKwon commented on PR #38723: URL: https://github.com/apache/spark/pull/38723#issuecomment-1324424425 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38686: [SPARK-41169][CONNECT][PYTHON] Implement `DataFrame.drop`
amaliujia commented on code in PR #38686: URL: https://github.com/apache/spark/pull/38686#discussion_r1029946709 ## python/pyspark/sql/connect/dataframe.py: ## @@ -255,10 +255,21 @@ def distinct(self) -> "DataFrame": ) def drop(self, *cols: "ColumnOrString") -> "DataFrame": Review Comment: I think we have sort of building the consensus that we prefer re-using the proto than ask clients do duplicate work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
amaliujia commented on code in PR #38723: URL: https://github.com/apache/spark/pull/38723#discussion_r1029932646 ## python/pyspark/sql/connect/column.py: ## @@ -263,6 +263,22 @@ def __str__(self) -> str: return f"Column({self._unparsed_identifier})" +class SQLExpression(Expression): Review Comment: Let's see in the future... I guess we will need to name more... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #38723: [SPARK-41201][CONNECT][PYTHON] Implement `DataFrame.SelectExpr` in Python client
amaliujia commented on code in PR #38723: URL: https://github.com/apache/spark/pull/38723#discussion_r1029932471 ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -220,6 +220,29 @@ def test_create_global_temp_view(self): with self.assertRaises(_MultiThreadedRendezvous): Review Comment: hmmm I guess that was gone during code conflict and resolution then good fix is gone. ## python/pyspark/sql/tests/connect/test_connect_basic.py: ## @@ -220,6 +220,29 @@ def test_create_global_temp_view(self): with self.assertRaises(_MultiThreadedRendezvous): self.connect.sql("SELECT 1 AS X LIMIT 0").createGlobalTempView("view_1") +def test_select_expr(self): +self.assert_eq( Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState
HyukjinKwon commented on code in PR #38013: URL: https://github.com/apache/spark/pull/38013#discussion_r1029932042 ## examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py: ## @@ -0,0 +1,139 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +r""" + Split lines into words, group by words and use the state per key to track session of each key. + + Usage: structured_network_wordcount_windowed.py + and describe the TCP server that Structured Streaming + would connect to receive data. + + To run this on your local machine, you need to first run a Netcat server +`$ nc -lk ` + and then run the example +`$ bin/spark-submit + examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py +localhost ` +""" +import sys +import math +from typing import Iterable, Any + +import pandas as pd + +from pyspark.sql import SparkSession +from pyspark.sql.functions import explode +from pyspark.sql.functions import split +from pyspark.sql.types import ( +LongType, +StringType, +StructType, +StructField, +) +from pyspark.sql.streaming.state import GroupStateTimeout, GroupState + +if __name__ == "__main__": +if len(sys.argv) != 3: +msg = "Usage: structured_network_wordcount_session_window.py " +print(msg, file=sys.stderr) +sys.exit(-1) + +host = sys.argv[1] +port = int(sys.argv[2]) + +spark = SparkSession.builder.appName( +"StructuredNetworkWordCountSessionWindow" +).getOrCreate() + +# Create DataFrame representing the stream of input lines from connection to host:port +lines = ( +spark.readStream.format("socket") +.option("host", host) +.option("port", port) +.option("includeTimestamp", "true") +.load() +) + +# Split the lines into words, retaining timestamps, each word become a sessionId +events = lines.select( +explode(split(lines.value, " ")).alias("sessionId"), +lines.timestamp.cast("long"), +) + +# Type of output records. +session_schema = StructType( +[ +StructField("sessionId", StringType()), +StructField("count", LongType()), +StructField("start", LongType()), +StructField("end", LongType()), +] +) +# Type of group state. +# Omit the session id in the state since it is available as group key +session_state_schema = StructType( +[ +StructField("count", LongType()), +StructField("start", LongType()), +StructField("end", LongType()), +] +) + +def func( +key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState +) -> Iterable[pd.DataFrame]: +if state.hasTimedOut: +count, start, end = state.get +state.remove() +yield pd.DataFrame( +{ +"sessionId": [key[0]], +"count": [count], +"start": [start], +"end": [end], +} +) +else: +start = math.inf +end = 0 +count = 0 +for pdf in pdf_iter: +start = min(start, min(pdf["timestamp"])) Review Comment: I suspect the return type is not matched to the SQL type provided. Do you mind show the reproducer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator( } } +@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = false): Unit = { + if (remainingBlocks.isEmpty) { +val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime) +if (wasReqForMergedChunks) { + shuffleMetrics.incRemoteMergedReqsDuration(durationMs) +} else { + shuffleMetrics.incRemoteReqsDuration(durationMs) +} Review Comment: `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating merged fetches as a subset of fetches for remote bytes - while here we are treating them separately. Any particular reason for this difference ? (A throughput computation will need to take this into account due to this difference) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator( } } +@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = false): Unit = { + if (remainingBlocks.isEmpty) { +val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime) +if (wasReqForMergedChunks) { + shuffleMetrics.incRemoteMergedReqsDuration(durationMs) +} else { + shuffleMetrics.incRemoteReqsDuration(durationMs) +} Review Comment: `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating merged fetches as a subset of fetches for remote bytes - while here we are treating them separately. Any particular reason for this difference ? (A throughput computation becomes tricky due to this difference) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator( } } +@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = false): Unit = { + if (remainingBlocks.isEmpty) { +val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime) +if (wasReqForMergedChunks) { + shuffleMetrics.incRemoteMergedReqsDuration(durationMs) +} else { + shuffleMetrics.incRemoteReqsDuration(durationMs) +} Review Comment: `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating merged fetches as a subset of fetches for remote bytes - while here we are treating them separately. Any particular reason for this difference ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
mridulm commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029927503 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator( } } +@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = false): Unit = { + if (remainingBlocks.isEmpty) { +val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - requestStartTime) +if (wasReqForMergedChunks) { + shuffleMetrics.incRemoteMergedReqsDuration(durationMs) +} else { + shuffleMetrics.incRemoteReqsDuration(durationMs) +} Review Comment: `shuffleLocalMetricsUpdate` and `shuffleRemoteMetricsUpdate` is treating merged fetches as a subset of all fetches - while here we are treating them seperately. Any particular reason for this difference ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip commented on pull request #38762: [SPARK-41225] [CONNECT] [PYTHON] Disable unsupported functions.
grundprinzip commented on PR #38762: URL: https://github.com/apache/spark/pull/38762#issuecomment-1324244517 @HyukjinKwon @hvanhovell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip opened a new pull request, #38762: [SPARK-41225] [CONNECT] [PYTHON] Disable unsupported functions.
grundprinzip opened a new pull request, #38762: URL: https://github.com/apache/spark/pull/38762 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
ahshahid commented on code in PR #38714: URL: https://github.com/apache/spark/pull/38714#discussion_r1029822940 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala: ## @@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper { */ def getOuterReferences(expr: Expression): Seq[Expression] = { val outerExpressions = ArrayBuffer.empty[Expression] -def collectOutRefs(input: Expression): Unit = input match { + +def collectOutRefs(input: Expression): Boolean = input match { case a: AggregateExpression if containsOuter(a) => if (a.references.nonEmpty) { throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin) } else { // Collect and update the sub-tree so that outer references inside this aggregate // expression will not be collected. For example: min(outer(a)) -> min(a). - val newExpr = stripOuterReference(a) - outerExpressions += newExpr + // delay collecting outer expression as we want to go as much up as possible Review Comment: But you are right.. I suppose so long as any expression is such that so long as only one of the child returns "true", then the current expression can be part of the outer ref. so basically it means that as long as the situation is not like. (sum(a) + sum(b) + 8, types... the expression can be considered to be referencable using outer ref. Will make that change.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] grundprinzip commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation
grundprinzip commented on code in PR #38659: URL: https://github.com/apache/spark/pull/38659#discussion_r1029721265 ## sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala: ## @@ -213,58 +214,115 @@ private[sql] object ArrowConverters extends Logging { }.next() } - /** - * Maps iterator from serialized ArrowRecordBatches to InternalRows. - */ - private[sql] def fromBatchIterator( + private[sql] abstract class InternalRowIterator( Review Comment: Please add some comment on how this is supposed to be used. The name sounds very innocent :) ## sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala: ## @@ -213,58 +214,115 @@ private[sql] object ArrowConverters extends Logging { }.next() } - /** - * Maps iterator from serialized ArrowRecordBatches to InternalRows. - */ - private[sql] def fromBatchIterator( + private[sql] abstract class InternalRowIterator( arrowBatchIter: Iterator[Array[Byte]], - schema: StructType, - timeZoneId: String, - context: TaskContext): Iterator[InternalRow] = { -val allocator = - ArrowUtils.rootAllocator.newChildAllocator("fromBatchIterator", 0, Long.MaxValue) - -val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) -val root = VectorSchemaRoot.create(arrowSchema, allocator) - -new Iterator[InternalRow] { - private var rowIter = if (arrowBatchIter.hasNext) nextBatch() else Iterator.empty - - if (context != null) context.addTaskCompletionListener[Unit] { _ => -root.close() -allocator.close() - } + context: TaskContext) + extends Iterator[InternalRow] { +// Keep all the resources we have opened in order, should be closed in reverse order finally. +val resources = new ArrayBuffer[AutoCloseable]() +protected val allocator: BufferAllocator = ArrowUtils.rootAllocator.newChildAllocator( + s"to${this.getClass.getSimpleName}", + 0, + Long.MaxValue) +resources.append(allocator) + +private var rowIterAndSchema = + if (arrowBatchIter.hasNext) nextBatch() else (Iterator.empty, null) +// We will ensure schemas parsed from every batch are the same +val schema: StructType = rowIterAndSchema._2 + +if (context != null) context.addTaskCompletionListener[Unit] { _ => + closeAll(resources.reverse: _*) +} - override def hasNext: Boolean = rowIter.hasNext || { -if (arrowBatchIter.hasNext) { - rowIter = nextBatch() - true -} else { - root.close() - allocator.close() - false +override def hasNext: Boolean = rowIterAndSchema._1.hasNext || { + if (arrowBatchIter.hasNext) { +rowIterAndSchema = nextBatch() +if (schema != rowIterAndSchema._2) { + throw new IllegalArgumentException( +s"ArrowBatch iterator contain 2 batches with" + + s" different schema: $schema and ${rowIterAndSchema._2}") } +rowIterAndSchema._1.hasNext + } else { +closeAll(resources.reverse: _*) +false } +} - override def next(): InternalRow = rowIter.next() +override def next(): InternalRow = rowIterAndSchema._1.next() - private def nextBatch(): Iterator[InternalRow] = { -val arrowRecordBatch = ArrowConverters.loadBatch(arrowBatchIter.next(), allocator) -val vectorLoader = new VectorLoader(root) -vectorLoader.load(arrowRecordBatch) -arrowRecordBatch.close() +def nextBatch(): (Iterator[InternalRow], StructType) + } -val columns = root.getFieldVectors.asScala.map { vector => - new ArrowColumnVector(vector).asInstanceOf[ColumnVector] -}.toArray + private[sql] class InternalRowIteratorWithoutSchema( + arrowBatchIter: Iterator[Array[Byte]], + schema: StructType, + timeZoneId: String, + context: TaskContext) + extends InternalRowIterator(arrowBatchIter, context) { + +override def nextBatch(): (Iterator[InternalRow], StructType) = { + val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) + val root = VectorSchemaRoot.create(arrowSchema, allocator) + resources.append(root) + val arrowRecordBatch = ArrowConverters.loadBatch(arrowBatchIter.next(), allocator) + val vectorLoader = new VectorLoader(root) + vectorLoader.load(arrowRecordBatch) + arrowRecordBatch.close() + (vectorSchemaRootToIter(root), schema) +} + } -val batch = new ColumnarBatch(columns) -batch.setNumRows(root.getRowCount) -batch.rowIterator().asScala + private[sql] class InternalRowIteratorWithSchema( + arrowBatchIter: Iterator[Array[Byte]], + context: TaskContext) + extends InternalRowIterator(arrowBatchIter, context) { +override def nextBatch(): (Iterator[InternalRow], StructType) = { + val reader = +new
[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
ahshahid commented on code in PR #38714: URL: https://github.com/apache/spark/pull/38714#discussion_r1029763899 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala: ## @@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper { */ def getOuterReferences(expr: Expression): Seq[Expression] = { val outerExpressions = ArrayBuffer.empty[Expression] -def collectOutRefs(input: Expression): Unit = input match { + +def collectOutRefs(input: Expression): Boolean = input match { case a: AggregateExpression if containsOuter(a) => if (a.references.nonEmpty) { throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin) } else { // Collect and update the sub-tree so that outer references inside this aggregate // expression will not be collected. For example: min(outer(a)) -> min(a). - val newExpr = stripOuterReference(a) - outerExpressions += newExpr + // delay collecting outer expression as we want to go as much up as possible Review Comment: I thought about that, but there are situations which I am not sure if they are correct or meaningful, related to binary operators. for eg projection as (sum(a) + sum(b) ) .. Not sure if this is a valid expression involving aggregates in project and having clause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #38575: [SPARK-40948][SQL][FOLLOWUP] Restore PATH_NOT_FOUND
MaxGekk commented on code in PR #38575: URL: https://github.com/apache/spark/pull/38575#discussion_r1029760391 ## R/pkg/tests/fulltests/test_sparkSQL.R: ## @@ -3990,12 +3990,16 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume expect_error(read.df(source = "json"), paste("Error in load : analysis error - Unable to infer schema for JSON.", "It must be specified manually")) - expect_error(read.df("arbitrary_path"), "Error in load : analysis error - Path does not exist") - expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist") - expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist") - expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist") + expect_error(read.df("arbitrary_path"), + "Error in load : analysis error - \\[PATH_NOT_FOUND\\].*") Review Comment: @HyukjinKwon @srielau @cloud-fan Are you ok with such changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org