[GitHub] [spark] LuciferYang opened a new pull request, #40248: [SPARK-42648][BUILD] Upgrade `versions-maven-plugin` to 2.15.0
LuciferYang opened a new pull request, #40248: URL: https://github.com/apache/spark/pull/40248 ### What changes were proposed in this pull request? This pr aims upgrade `versions-maven-plugin` to 2.15.0 ### Why are the changes needed? New version bring some improvements - https://github.com/mojohaus/versions/pull/915 - https://github.com/mojohaus/versions/pull/881 - https://github.com/mojohaus/versions/pull/912 - https://github.com/mojohaus/versions/pull/898 - https://github.com/mojohaus/versions/pull/889 - https://github.com/mojohaus/versions/pull/883 - https://github.com/mojohaus/versions/pull/886 - https://github.com/mojohaus/versions/pull/878 - https://github.com/mojohaus/versions/pull/893 and some bug fix: - https://github.com/mojohaus/versions/pull/907 - https://github.com/mojohaus/versions/pull/901 - https://github.com/mojohaus/versions/pull/897 - https://github.com/mojohaus/versions/pull/891 The full release notes as follows: - https://github.com/apache/spark/pull/39784 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - GA `Dependencies test` should work normally - Manually check `./dev/test-dependencies.sh --replace-manifest`, run successful -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun opened a new pull request, #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files
dongjoon-hyun opened a new pull request, #40249: URL: https://github.com/apache/spark/pull/40249 … ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files
dongjoon-hyun commented on PR #40249: URL: https://github.com/apache/spark/pull/40249#issuecomment-1451470960 cc @srowen and @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allanf-db opened a new pull request, #40250: [WIP][SPARK-42642][DOCS][PYTHON] Updating remaining Spark documentation code examples to show Python by default
allanf-db opened a new pull request, #40250: URL: https://github.com/apache/spark/pull/40250 ### What changes were proposed in this pull request? Making Python the first tab for code examples in the Spark documentation. ### Why are the changes needed? This completes the work started with [SPARK-42493]. Python is the most approachable and most popular language and this change moves Python code examples to the first tab (showing by default). ### Does this PR introduce _any_ user-facing change? Yes, the user facing Spark documentation is updated. ### How was this patch tested? I built the website locally and manually tested the pages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on PR #39459: URL: https://github.com/apache/spark/pull/39459#issuecomment-1451486114 > @ivoson Did you get a chance to address the pending comments above ? ([here](https://github.com/apache/spark/pull/39459#discussion_r1117995723) and [here](https://github.com/apache/spark/pull/39459#discussion_r1117996417)) Thanks @mridulm for reminding. Missed these 2... Updated, please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1122750441 ## core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala: ## @@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging { try { val wrapper = new BlockInfoWrapper(newBlockInfo, lock) while (true) { -val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper) +val previous = invisibleRDDBlocks.synchronized { Review Comment: done, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1122750718 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -1325,31 +1328,74 @@ private[spark] class BlockManager( blockInfoManager.releaseAllLocksForTask(taskAttemptId) } + /** + * Retrieve the given rdd block if it exists and is visible, otherwise call the provided + * `makeIterator` method to compute the block, persist it, and return its values. + * + * @return either a BlockResult if the block was successfully cached, or an iterator if the block + * could not be cached. + */ + def getOrElseUpdateRDDBlock[T]( + taskId: Long, + blockId: RDDBlockId, + level: StorageLevel, + classTag: ClassTag[T], + makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { +val isCacheVisible = isRDDBlockVisible(blockId) +val res = getOrElseUpdate(blockId, level, classTag, makeIterator, isCacheVisible) +if (res.isLeft && !isCacheVisible) { + // Block exists but not visible, report taskId -> blockId info to master. + master.updateRDDBlockTaskInfo(blockId, taskId) +} + +res + } + /** * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method * to compute the block, persist it, and return its values. * * @return either a BlockResult if the block was successfully cached, or an iterator if the block * could not be cached. */ - def getOrElseUpdate[T]( + private def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], - makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { -// Attempt to read the block from local or remote storage. If it's present, then we don't need -// to go through the local-get-or-put path. -get[T](blockId)(classTag) match { - case Some(block) => -return Left(block) - case _ => -// Need to compute the block. + makeIterator: () => Iterator[T], + isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = { Review Comment: done, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a diff in pull request #40190: [SPARK-42597][SQL] UnwrapCastInBinaryComparison support unwrap timestamp type
wangyum commented on code in PR #40190: URL: https://github.com/apache/spark/pull/40190#discussion_r1122758398 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala: ## @@ -293,6 +299,34 @@ object UnwrapCastInBinaryComparison extends Rule[LogicalPlan] { } } + /** + * Move the cast to the literal side, because we can only get the minimum value of timestamp, + * so some BinaryComparison needs to be changed, + * such as CAST(ts AS date) > DATE '2023-01-01' ===> ts >= TIMESTAMP '2023-01-02 00:00:00' Review Comment: Yes, it should be `'2023-01-02 00:00:00' > '2023-01-01 23:59:59.99'` if do not change `>` to `>=`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files
dongjoon-hyun commented on PR #40249: URL: https://github.com/apache/spark/pull/40249#issuecomment-1451524437 Thank you, @HyukjinKwon and @LuciferYang . The license test passed. Merged to master/3.4/3.3/3.2. ![Screenshot 2023-03-02 at 1 02 02 AM](https://user-images.githubusercontent.com/9700541/222381571-afcd08bf-0503-4250-8085-c2c824f570a5.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files
dongjoon-hyun closed pull request #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files URL: https://github.com/apache/spark/pull/40249 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40250: [SPARK-42642][DOCS][PYTHON] Updating remaining Spark documentation code examples to show Python by default
HyukjinKwon commented on PR #40250: URL: https://github.com/apache/spark/pull/40250#issuecomment-1451560563 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #40250: [SPARK-42642][DOCS][PYTHON] Updating remaining Spark documentation code examples to show Python by default
HyukjinKwon closed pull request #40250: [SPARK-42642][DOCS][PYTHON] Updating remaining Spark documentation code examples to show Python by default URL: https://github.com/apache/spark/pull/40250 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.
MaxGekk commented on code in PR #40237: URL: https://github.com/apache/spark/pull/40237#discussion_r1122804677 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala: ## @@ -1961,6 +1961,99 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("SPARK-42635: timestampadd near daylight saving transition") { +// In America/Los_Angeles timezone, timestamp value `skippedTime` is 2011-03-13 03:00:00. +// The next second of 2011-03-13 01:59:59 jumps to 2011-03-13 03:00:00. +val skippedTime = 13000104L +// In America/Los_Angeles timezone, both timestamp range `[repeatedTime - MICROS_PER_HOUR, +// repeatedTime)` and `[repeatedTime, repeatedTime + MICROS_PER_HOUR)` map to +// [2011-11-06 01:00:00, 2011-11-06 02:00:00). +// The next second of 2011-11-06 01:59:59 (pre-transition) jumps back to 2011-11-06 01:00:00. +val repeatedTime = 13205700L +withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId) { + // Adding one day is **not** equivalent to adding _PER_DAY time units, because not every + // day has 24 hours: 2011-03-13 has 23 hours, 2011-11-06 has 25 hours. + + // timestampadd(DAY, 1, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 + checkEvaluation( +TimestampAdd("DAY", Literal(1), Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime) + // timestampadd(HOUR, 24, 2011-03-12 03:00:00) = 2011-03-13 04:00:00 + checkEvaluation( +TimestampAdd("HOUR", Literal(24), + Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime + MICROS_PER_HOUR) + // timestampadd(HOUR, 23, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 + checkEvaluation( +TimestampAdd("HOUR", Literal(23), + Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime) + // timestampadd(SECOND, SECONDS_PER_DAY, 2011-03-12 03:00:00) = 2011-03-13 04:00:00 + checkEvaluation( +TimestampAdd( + "SECOND", Literal(SECONDS_PER_DAY.toInt), + Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime + MICROS_PER_HOUR) + // timestampadd(SECOND, SECONDS_PER_DAY, 2011-03-12 03:00:00) = 2011-03-13 03:59:59 + checkEvaluation( +TimestampAdd( + "SECOND", Literal(SECONDS_PER_DAY.toInt - 1), + Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime + MICROS_PER_HOUR - MICROS_PER_SECOND) + + // timestampadd(DAY, 1, 2011-11-05 02:00:00) = 2011-11-06 02:00:00 + checkEvaluation( +TimestampAdd("DAY", Literal(1), + Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)), +repeatedTime + MICROS_PER_HOUR) + // timestampadd(DAY, 1, 2011-11-05 01:00:00) = 2011-11-06 01:00:00 (pre-transition) + checkEvaluation( +TimestampAdd("DAY", Literal(1), + Literal(repeatedTime - 25 * MICROS_PER_HOUR, TimestampType)), +repeatedTime - MICROS_PER_HOUR) + // timestampadd(DAY, -1, 2011-11-07 01:00:00) = 2011-11-06 01:00:00 (post-transition) + checkEvaluation( +TimestampAdd("DAY", Literal(-1), + Literal(repeatedTime + 24 * MICROS_PER_HOUR, TimestampType)), +repeatedTime) + // timestampadd(MONTH, 1, 2011-10-06 01:00:00) = 2011-11-06 01:00:00 (pre-transition) + checkEvaluation( +TimestampAdd( + "MONTH", Literal(1), + Literal(repeatedTime - MICROS_PER_HOUR - 31 * MICROS_PER_DAY, TimestampType)), +repeatedTime - MICROS_PER_HOUR) + // timestampadd(MONTH, -1, 2011-12-06 01:00:00) = 2011-11-06 01:00:00 (post-transition) + checkEvaluation( +TimestampAdd( + "MONTH", Literal(-1), + Literal(repeatedTime + 30 * MICROS_PER_DAY, TimestampType)), +repeatedTime) + // timestampadd(HOUR, 23, 2011-11-05 02:00:00) = 2011-11-06 01:00:00 (pre-transition) + checkEvaluation( +TimestampAdd("HOUR", Literal(23), + Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)), +repeatedTime - MICROS_PER_HOUR) + // timestampadd(HOUR, 24, 2011-11-05 02:00:00) = 2011-11-06 01:00:00 (post-transition) + checkEvaluation( +TimestampAdd("HOUR", Literal(24), + Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)), +repeatedTime) +} + } + + test("SPARK-42635: timestampadd unit conversion overflow") { +withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + checkExceptionInExpression[SparkArithmeticException](TimestampAdd("DAY", Review Comment: Could you use `checkErrorInExpression` instead of `checkExceptionInExpression`, please. This will allow to avoid the dependency of error message, so, tech editors will be able to modify `error-classes.json` w/o the modifying Spark tests. ## sql/cataly
[GitHub] [spark] MaxGekk commented on a diff in pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.
MaxGekk commented on code in PR #40237: URL: https://github.com/apache/spark/pull/40237#discussion_r1122804677 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala: ## @@ -1961,6 +1961,99 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("SPARK-42635: timestampadd near daylight saving transition") { +// In America/Los_Angeles timezone, timestamp value `skippedTime` is 2011-03-13 03:00:00. +// The next second of 2011-03-13 01:59:59 jumps to 2011-03-13 03:00:00. +val skippedTime = 13000104L +// In America/Los_Angeles timezone, both timestamp range `[repeatedTime - MICROS_PER_HOUR, +// repeatedTime)` and `[repeatedTime, repeatedTime + MICROS_PER_HOUR)` map to +// [2011-11-06 01:00:00, 2011-11-06 02:00:00). +// The next second of 2011-11-06 01:59:59 (pre-transition) jumps back to 2011-11-06 01:00:00. +val repeatedTime = 13205700L +withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId) { + // Adding one day is **not** equivalent to adding _PER_DAY time units, because not every + // day has 24 hours: 2011-03-13 has 23 hours, 2011-11-06 has 25 hours. + + // timestampadd(DAY, 1, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 + checkEvaluation( +TimestampAdd("DAY", Literal(1), Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime) + // timestampadd(HOUR, 24, 2011-03-12 03:00:00) = 2011-03-13 04:00:00 + checkEvaluation( +TimestampAdd("HOUR", Literal(24), + Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime + MICROS_PER_HOUR) + // timestampadd(HOUR, 23, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 + checkEvaluation( +TimestampAdd("HOUR", Literal(23), + Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime) + // timestampadd(SECOND, SECONDS_PER_DAY, 2011-03-12 03:00:00) = 2011-03-13 04:00:00 + checkEvaluation( +TimestampAdd( + "SECOND", Literal(SECONDS_PER_DAY.toInt), + Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime + MICROS_PER_HOUR) + // timestampadd(SECOND, SECONDS_PER_DAY, 2011-03-12 03:00:00) = 2011-03-13 03:59:59 + checkEvaluation( +TimestampAdd( + "SECOND", Literal(SECONDS_PER_DAY.toInt - 1), + Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)), +skippedTime + MICROS_PER_HOUR - MICROS_PER_SECOND) + + // timestampadd(DAY, 1, 2011-11-05 02:00:00) = 2011-11-06 02:00:00 + checkEvaluation( +TimestampAdd("DAY", Literal(1), + Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)), +repeatedTime + MICROS_PER_HOUR) + // timestampadd(DAY, 1, 2011-11-05 01:00:00) = 2011-11-06 01:00:00 (pre-transition) + checkEvaluation( +TimestampAdd("DAY", Literal(1), + Literal(repeatedTime - 25 * MICROS_PER_HOUR, TimestampType)), +repeatedTime - MICROS_PER_HOUR) + // timestampadd(DAY, -1, 2011-11-07 01:00:00) = 2011-11-06 01:00:00 (post-transition) + checkEvaluation( +TimestampAdd("DAY", Literal(-1), + Literal(repeatedTime + 24 * MICROS_PER_HOUR, TimestampType)), +repeatedTime) + // timestampadd(MONTH, 1, 2011-10-06 01:00:00) = 2011-11-06 01:00:00 (pre-transition) + checkEvaluation( +TimestampAdd( + "MONTH", Literal(1), + Literal(repeatedTime - MICROS_PER_HOUR - 31 * MICROS_PER_DAY, TimestampType)), +repeatedTime - MICROS_PER_HOUR) + // timestampadd(MONTH, -1, 2011-12-06 01:00:00) = 2011-11-06 01:00:00 (post-transition) + checkEvaluation( +TimestampAdd( + "MONTH", Literal(-1), + Literal(repeatedTime + 30 * MICROS_PER_DAY, TimestampType)), +repeatedTime) + // timestampadd(HOUR, 23, 2011-11-05 02:00:00) = 2011-11-06 01:00:00 (pre-transition) + checkEvaluation( +TimestampAdd("HOUR", Literal(23), + Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)), +repeatedTime - MICROS_PER_HOUR) + // timestampadd(HOUR, 24, 2011-11-05 02:00:00) = 2011-11-06 01:00:00 (post-transition) + checkEvaluation( +TimestampAdd("HOUR", Literal(24), + Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)), +repeatedTime) +} + } + + test("SPARK-42635: timestampadd unit conversion overflow") { +withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + checkExceptionInExpression[SparkArithmeticException](TimestampAdd("DAY", Review Comment: Could you use `checkErrorInExpression` instead of `checkExceptionInExpression`, please. This will allow to avoid the dependency of error message, so, tech editors will be able to modify `error-classes.json` w/o modifying Spark tests. -- This is an automated
[GitHub] [spark] zhengruifeng closed pull request #40243: [SPARK-42641][CONNECT][BUILD] Upgrade buf from 1.14.0 to 1.15.0
zhengruifeng closed pull request #40243: [SPARK-42641][CONNECT][BUILD] Upgrade buf from 1.14.0 to 1.15.0 URL: https://github.com/apache/spark/pull/40243 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40243: [SPARK-42641][CONNECT][BUILD] Upgrade buf from 1.14.0 to 1.15.0
zhengruifeng commented on PR #40243: URL: https://github.com/apache/spark/pull/40243#issuecomment-1451586124 merged into master/3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon opened a new pull request, #40251: [SPARK-41725][PYTHON][TESTS][FOLLOW-UP] Remove collect in SQL command execution in tests
HyukjinKwon opened a new pull request, #40251: URL: https://github.com/apache/spark/pull/40251 ### What changes were proposed in this pull request? This PR removes `sql("command").collect()` workaround in PySpark tests codes. ### Why are the changes needed? They were added previously to work around within Spark Connect. This is fixed now, so we don't need to call `collect` anymore. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? CI in this PR should test it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40251: [SPARK-41725][PYTHON][TESTS][FOLLOW-UP] Remove collect for SQL command execution in tests
HyukjinKwon commented on PR #40251: URL: https://github.com/apache/spark/pull/40251#issuecomment-1451727734 All related tests passed. Merged to master and branch-3.4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #40251: [SPARK-41725][PYTHON][TESTS][FOLLOW-UP] Remove collect for SQL command execution in tests
HyukjinKwon closed pull request #40251: [SPARK-41725][PYTHON][TESTS][FOLLOW-UP] Remove collect for SQL command execution in tests URL: https://github.com/apache/spark/pull/40251 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on a diff in pull request #40217: [SPARK-42559][CONNECT] Implement DataFrameNaFunctions
panbingkun commented on code in PR #40217: URL: https://github.com/apache/spark/pull/40217#discussion_r1122990355 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala: ## @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.{lang => jl} +import java.util.Locale + +import scala.collection.JavaConverters._ + +import org.apache.spark.connect.proto.Expression.{Literal => GLiteral} +import org.apache.spark.connect.proto.{NAReplace, Relation} +import org.apache.spark.connect.proto.NAReplace.Replacement + +/** + * Functionality for working with missing data in `DataFrame`s. + * + * @since 3.4.0 + */ +final class DataFrameNaFunctions private[sql](sparkSession: SparkSession, root: Relation) { + + /** + * Returns a new `DataFrame` that drops rows containing any null or NaN values. + * + * @since 3.4.0 + */ + def drop(): DataFrame = { +sparkSession.newDataFrame { builder => + builder.getDropNaBuilder.setInput(root) +} + } + + /** + * Returns a new `DataFrame` that drops rows containing null or NaN values. + * + * If `how` is "any", then drop rows containing any null or NaN values. + * If `how` is "all", then drop rows only if every column is null or NaN for that row. + * + * @since 3.4.0 + */ + def drop(how: String): DataFrame = { +sparkSession.newDataFrame { builder => + val dropNaBuilder = builder.getDropNaBuilder.setInput(root) + how.toLowerCase(Locale.ROOT) match { Review Comment: Done ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala: ## @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.{lang => jl} +import java.util.Locale + +import scala.collection.JavaConverters._ + +import org.apache.spark.connect.proto.Expression.{Literal => GLiteral} +import org.apache.spark.connect.proto.{NAReplace, Relation} +import org.apache.spark.connect.proto.NAReplace.Replacement + +/** + * Functionality for working with missing data in `DataFrame`s. + * + * @since 3.4.0 + */ +final class DataFrameNaFunctions private[sql](sparkSession: SparkSession, root: Relation) { + + /** + * Returns a new `DataFrame` that drops rows containing any null or NaN values. + * + * @since 3.4.0 + */ + def drop(): DataFrame = { +sparkSession.newDataFrame { builder => Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer opened a new pull request, #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
beliefer opened a new pull request, #40252: URL: https://github.com/apache/spark/pull/40252 ### What changes were proposed in this pull request? Currently, the connect project have the new `DataFrameReader` API which is corresponding to Spark `DataFrameReader` API. But the connect `DataFrameReader` missing the jdbc API. ### Why are the changes needed? This PR try to add JDBC to `DataFrameReader` ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? N/A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
beliefer commented on PR #40252: URL: https://github.com/apache/spark/pull/40252#issuecomment-1451774998 The jdbc API seems hard to test, do we need a test case? @hvanhovell @HyukjinKwon @zhengruifeng @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
beliefer commented on PR #40252: URL: https://github.com/apache/spark/pull/40252#issuecomment-1451778555 There is another kind jdbc API, see: https://github.com/apache/spark/blob/79da1ab400f25dbceec45e107e5366d084138fa8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L316 I will create another PR to add proto msg and implement it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40218: [SPARK-42579][CONNECT] Part-1: `function.lit` support `Array[_]` dataType
LuciferYang commented on code in PR #40218: URL: https://github.com/apache/spark/pull/40218#discussion_r1123020720 ## connector/connect/common/src/main/protobuf/spark/connect/expressions.proto: ## @@ -189,6 +190,11 @@ message Expression { int32 days = 2; int64 microseconds = 3; } + +message Array { + DataType elementType = 1; + repeated Literal element = 2; Review Comment: should be `element` or `elements`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yabola commented on a diff in pull request #39950: [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader
yabola commented on code in PR #39950: URL: https://github.com/apache/spark/pull/39950#discussion_r1121979988 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala: ## @@ -296,41 +309,45 @@ class ParquetFileFormat throw e } } else { -logDebug(s"Falling back to parquet-mr") -// ParquetRecordReader returns InternalRow -val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) -val reader = if (pushed.isDefined && enableRecordFilter) { - val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[InternalRow](readSupport, parquetFilter) -} else { - new ParquetRecordReader[InternalRow](readSupport) -} -val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, -requiredSchema) -val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) try { - readerWithRowIndexes.initialize(split, hadoopAttemptContext) - - val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { -// There is no partition columns -iter.map(unsafeProjection) + logDebug(s"Falling back to parquet-mr") + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( +convertTz, +enableVectorizedReader = false, +datetimeRebaseSpec, +int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { +val parquetFilter = FilterCompat.get(pushed.get, null) +new ParquetRecordReader[InternalRow](readSupport, parquetFilter) } else { -val joinedRow = new JoinedRow() -iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) +new ParquetRecordReader[InternalRow](readSupport) } -} catch { - case e: Throwable => -// SPARK-23457: In case there is an exception in initialization, close the iterator to -// avoid leaking resources. -iter.close() -throw e + val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, +requiredSchema) + val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) + try { +readerWithRowIndexes.initialize(split, hadoopAttemptContext) + +val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes +val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + +if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) +} else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) +} + } catch { +case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } +} finally { + parquetReader.close() Review Comment: I haven't change the code logic, just add to close parquetReader. Before, if the non-vectorization code path will close the reader after reading `footerFileMetaData` , it is consistent here. Please see [the original implementation code](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java#L53) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
zhengruifeng commented on PR #40252: URL: https://github.com/apache/spark/pull/40252#issuecomment-1451803222 I guess you can refer to `JDBCSuite` and `ClientE2ETestSuite` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jiang13021 opened a new pull request, #40253: [SPARK-42553][SQL] Ensure at least one time unit after "interval"
jiang13021 opened a new pull request, #40253: URL: https://github.com/apache/spark/pull/40253 ### What changes were proposed in this pull request? THis PR aims to ensure "at least one time unit should be given for interval literal" by modifying SqlBaseParser ### Why are the changes needed? INTERVAL is a Non-Reserved keyword in spark. But when I run ```shell scala> spark.sql("select interval from mytable") ``` I get ``` org.apache.spark.sql.catalyst.parser.ParseException: at least one time unit should be given for interval literal(line 1, pos 7)== SQL == select interval from mytable ---^^^ at org.apache.spark.sql.errors.QueryParsingErrors$.invalidIntervalLiteralError(QueryParsingErrors.scala:196) .. ``` It is a bug because "Non-Reserved keywords" have a special meaning in particular contexts and can be used as identifiers in other contexts. So by design, INTERVAL can be used as a column name. Currently the interval's grammar is ``` interval : INTERVAL (errorCapturingMultiUnitsInterval | errorCapturingUnitToUnitInterval)? ; ``` There is no need to make the time unit nullable, we can ensure "at least one time unit should be given for interval literal" if the interval's grammar is ``` interval : INTERVAL (errorCapturingMultiUnitsInterval | errorCapturingUnitToUnitInterval) ; ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test: PlanParsserSuite."SPARK-42553: NonReserved keyword 'interval' can be column name" Local test ```shell scala> val myDF = spark.sparkContext.makeRDD(1 to 5).toDF("interval") myDF: org.apache.spark.sql.DataFrame = [interval: int] scala> myDF.createOrReplaceTempView("mytable") scala> spark.sql("select interval from mytable;").show() ++ |interval| ++ | 1| | 2| | 3| | 4| | 5| ++ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jiang13021 commented on pull request #40195: [SPARK-42553][SQL] Ensure at least one time unit after "interval"
jiang13021 commented on PR #40195: URL: https://github.com/apache/spark/pull/40195#issuecomment-1451819657 > @jiang13021 The changes causes some conflicts in branch-3.3. Could you open a separate PR with a backport to Spark 3.3. Thanks for your review. Do you mean launch a new PR to branch-3.3? Here it is: https://github.com/apache/spark/pull/40253 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #40195: [SPARK-42553][SQL] Ensure at least one time unit after "interval"
MaxGekk commented on PR #40195: URL: https://github.com/apache/spark/pull/40195#issuecomment-1451827970 > Do you mean launch a new PR to branch-3.3? Here it is: https://github.com/apache/spark/pull/40253 Yep. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #40253: [SPARK-42553][SQL] Ensure at least one time unit after "interval"
MaxGekk commented on PR #40253: URL: https://github.com/apache/spark/pull/40253#issuecomment-1451830956 @jiang13021 Thank you for the backport. Could add the following, please: 1. The tag `[3.3]` to PR's title. 2. `This is a backport of https://github.com/apache/spark/pull/40195` to PR's description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #40220: [WIP][SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types
srowen commented on PR #40220: URL: https://github.com/apache/spark/pull/40220#issuecomment-1451854672 The line changed, and now the 'ignore' is no longer relevant - yes remove it to pass the linter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jiang13021 commented on pull request #40253: [SPARK-42553][SQL][3.3] Ensure at least one time unit after "interval"
jiang13021 commented on PR #40253: URL: https://github.com/apache/spark/pull/40253#issuecomment-1451860426 > @jiang13021 Thank you for the backport. Could add the following, please: > > 1. The tag `[3.3]` to PR's title. > 2. `This is a backport of https://github.com/apache/spark/pull/40195` to PR's description. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] EnricoMi commented on pull request #38223: [SPARK-40770][PYTHON] Improved error messages for applyInPandas for schema mismatch
EnricoMi commented on PR #38223: URL: https://github.com/apache/spark/pull/38223#issuecomment-1451889010 @HyukjinKwon why has this been reverted? a86324cb CI looked pretty green: https://github.com/apache/spark/actions/runs/4129838944/jobs/7135888364 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yabola commented on pull request #39950: [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader
yabola commented on PR #39950: URL: https://github.com/apache/spark/pull/39950#issuecomment-1451913404 @sunchao Sorry, it might be a mistake. We should read schema in footer meta first to get which filters need to be pushed down. After that we set pushdown info [codes](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L261) and read filtered RowGroups. So I think it might have to read footer twice if having filters. But we can read footer once when no filter is needed pushdown. I think It will be useful when scanning joined tables (filter condition is only on the other side of the join table and filter is none in the joined table). It can avoid reading much footer when there are many joined tables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen closed pull request #40219: [SPARK-42622][CORE] Disable substitution in values
srowen closed pull request #40219: [SPARK-42622][CORE] Disable substitution in values URL: https://github.com/apache/spark/pull/40219 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #40219: [SPARK-42622][CORE] Disable substitution in values
srowen commented on PR #40219: URL: https://github.com/apache/spark/pull/40219#issuecomment-1451982884 Merged to master/3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #40253: [SPARK-42553][SQL][3.3] Ensure at least one time unit after "interval"
MaxGekk commented on PR #40253: URL: https://github.com/apache/spark/pull/40253#issuecomment-1452051052 +1, LGTM. All GAs passed. Merging to 3.3. Thank you, @jiang13021. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #40253: [SPARK-42553][SQL][3.3] Ensure at least one time unit after "interval"
MaxGekk closed pull request #40253: [SPARK-42553][SQL][3.3] Ensure at least one time unit after "interval" URL: https://github.com/apache/spark/pull/40253 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
LuciferYang commented on code in PR #40252: URL: https://github.com/apache/spark/pull/40252#discussion_r1123323759 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala: ## @@ -184,6 +186,67 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging } } + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL + * url named table and connection properties. + * + * You can find the JDBC-specific option and parameter documentation for reading tables + * via JDBC in + * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";> + * Data Source Option in the version you use. + * + * @since 3.5.0 + */ + def jdbc(url: String, table: String, properties: Properties): DataFrame = { +// properties should override settings in extraOptions. +this.extraOptions ++= properties.asScala +// explicit url and dbtable should override all +this.extraOptions ++= Seq("url" -> url, "dbtable" -> table) +format("jdbc").load() + } + + // scalastyle:off line.size.limit + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL + * url named table. Partitions of the table will be retrieved in parallel based on the parameters + * passed to this function. + * + * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash + * your external database systems. + * + * You can find the JDBC-specific option and parameter documentation for reading tables via JDBC in + * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";> + * Data Source Option in the version you use. + * + * @param table Name of the table in the external database. + * @param columnName Alias of `partitionColumn` option. Refer to `partitionColumn` in + * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";> + * Data Source Option in the version you use. + * @param connectionProperties JDBC database connection arguments, a list of arbitrary string + * tag/value. Normally at least a "user" and "password" property + * should be included. "fetchsize" can be used to control the + * number of rows per fetch and "queryTimeout" can be used to wait + * for a Statement object to execute to the given number of seconds. + * @since 3.5.0 Review Comment: Will this not be backport to 3.4.0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #40254: [SPARK-42654][BUILD] WIP
LuciferYang opened a new pull request, #40254: URL: https://github.com/apache/spark/pull/40254 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #40255: [SPARK-42558][CONNECT] DataFrameStatFunctions WIP
LuciferYang opened a new pull request, #40255: URL: https://github.com/apache/spark/pull/40255 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40255: [SPARK-42558][CONNECT] DataFrameStatFunctions WIP
LuciferYang commented on code in PR #40255: URL: https://github.com/apache/spark/pull/40255#discussion_r1123361868 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -586,6 +586,57 @@ class ClientE2ETestSuite extends RemoteSparkSession { list.asScala.map(kv => Row(kv.key, kv.value)), session.createDataFrame(list.asScala.toSeq)) } + + test("approximate quantile") { Review Comment: will add more case later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40255: [SPARK-42558][CONNECT] DataFrameStatFunctions WIP
LuciferYang commented on code in PR #40255: URL: https://github.com/apache/spark/pull/40255#discussion_r1123365858 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala: ## @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.{lang => jl, util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.spark.connect.proto.Relation +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, PrimitiveDoubleEncoder} +import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch} + +/** + * Statistic functions for `DataFrame`s. + * + * @since 3.4.0 + */ +final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, root: Relation) { + + /** + * Calculates the approximate quantiles of a numerical column of a DataFrame. + * + * The result of this algorithm has the following deterministic bound: If the DataFrame has N + * elements and if we request the quantile at probability `p` up to error `err`, then the + * algorithm will return a sample `x` from the DataFrame so that the *exact* rank of `x` is + * close to (p * N). More precisely, + * + * {{{ + * floor((p - err) * N) <= rank(x) <= ceil((p + err) * N) + * }}} + * + * This method implements a variation of the Greenwald-Khanna algorithm (with some speed + * optimizations). The algorithm was first present in https://doi.org/10.1145/375663.375670";> Space-efficient Online Computation of Quantile + * Summaries by Greenwald and Khanna. + * + * @param col + * the name of the numerical column + * @param probabilities + * a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the + * minimum, 0.5 is the median, 1 is the maximum. + * @param relativeError + * The relative target precision to achieve (greater than or equal to 0). If set to zero, the + * exact quantiles are computed, which could be very expensive. Note that values greater than + * 1 are accepted but give the same result as 1. + * @return + * the approximate quantiles at the given probabilities + * + * @note + * null and NaN values will be removed from the numerical column before calculation. If the + * dataframe is empty or the column only contains null or NaN, an empty array is returned. + * + * @since 3.4.0 + */ + def approxQuantile( + col: String, + probabilities: Array[Double], + relativeError: Double): Array[Double] = { +approxQuantile(Array(col), probabilities, relativeError).head + } + + /** + * Calculates the approximate quantiles of numerical columns of a DataFrame. + * @see + * `approxQuantile(col:Str* approxQuantile)` for detailed description. + * + * @param cols + * the names of the numerical columns + * @param probabilities + * a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the + * minimum, 0.5 is the median, 1 is the maximum. + * @param relativeError + * The relative target precision to achieve (greater than or equal to 0). If set to zero, the + * exact quantiles are computed, which could be very expensive. Note that values greater than + * 1 are accepted but give the same result as 1. + * @return + * the approximate quantiles at the given probabilities of each column + * + * @note + * null and NaN values will be ignored in numerical columns before calculation. For columns + * only containing null or NaN values, an empty array is returned. + * + * @since 3.4.0 + */ + def approxQuantile( + cols: Array[String], + probabilities: Array[Double], + relativeError: Double): Array[Array[Double]] = { +require( + probabilities.forall(p => p >= 0.0 && p <= 1.0), + "percentile should be in the range [0.0, 1.0]") +require(relativeError >= 0, + s"Relative Error must be non-negative but got $relativeError") +sparkSession + .newDataset( +ArrayEncoder( + ArrayEncoder(PrimitiveDoubleEncoder, containsNull = false), + containsNull = false)) { builder => +va
[GitHub] [spark] aimtsou commented on pull request #40220: [WIP][SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types
aimtsou commented on PR #40220: URL: https://github.com/apache/spark/pull/40220#issuecomment-1452145653 Tests are completed, shall I squash and remove wip tag from the pull request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #40126: [SPARK-40822][SQL] Stable derived column aliases
MaxGekk commented on PR #40126: URL: https://github.com/apache/spark/pull/40126#issuecomment-1452182391 @srielau Could you take a look at the PR one more time, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #40126: [SPARK-40822][SQL] Stable derived column aliases
MaxGekk commented on code in PR #40126: URL: https://github.com/apache/spark/pull/40126#discussion_r1123410550 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala: ## @@ -88,4 +94,46 @@ class ResolveAliasesSuite extends AnalysisTest { checkAliasName(t1.select(DateSub(Literal(Date.valueOf("2021-01-18")), Literal(null))), "date_sub(DATE '2021-01-18', NULL)") } + + test("SPARK-40822: Stable derived column aliases") { +withSQLConf(SQLConf.STABLE_DERIVED_COLUMN_ALIAS_ENABLED.key -> "true") { + Seq( +// Literals +"' 1'" -> "' 1'", +abc -> abc, Review Comment: I have added such conversion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vicennial opened a new pull request, #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial opened a new pull request, #40256: URL: https://github.com/apache/spark/pull/40256 ### What changes were proposed in this pull request? This PR introduces a mechanism to transfer artifacts (currently, local `.jar` + `.class` files) from a Spark Connect JVM/Scala client over to the server side of Spark Connect. The mechanism follows the protocol as defined in https://github.com/apache/spark/pull/40147 and supports batching (for multiple "small" artifacts) and chunking (for large artifacts). ### Why are the changes needed? In the decoupled client-server architecture of Spark Connect, a remote client may use a local JAR or a new class in their UDF that may not be present on the server. To handle these cases of missing "artifacts", we implement a mechanism to transfer artifacts from the client side over to the server side as per the protocol defined in https://github.com/apache/spark/pull/40147. ### Does this PR introduce _any_ user-facing change? Yes, users would be able to use the `addArtifact` and `addArtifacts` methods (via a `SparkSession` instance) to transfer local files (`.jar` and `.class` extensions). ### How was this patch tested? Unit tests - located in `ArtifactSuite`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #40220: [WIP][SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types
srowen commented on PR #40220: URL: https://github.com/apache/spark/pull/40220#issuecomment-1452232660 Yes remove WIP just for completeness. No need to squash, the script does that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] steveloughran commented on pull request #39124: [DON'T MERGE] Test build and test with hadoop 3.3.5-RC2
steveloughran commented on PR #39124: URL: https://github.com/apache/spark/pull/39124#issuecomment-1452267600 thanks. HDFS team reporting a probable RC blocker, but now is the time to find any other issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
ueshin commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123536612 ## python/pyspark/sql/connect/plan.py: ## @@ -338,7 +338,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: plan.local_relation.data = sink.getvalue().to_pybytes() if self._schema is not None: -plan.local_relation.schema = self._schema +plan.local_relation.schemaString = self._schema Review Comment: ```suggestion plan.local_relation.schema_string = self._schema ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
hvanhovell commented on code in PR #40252: URL: https://github.com/apache/spark/pull/40252#discussion_r1123553052 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala: ## @@ -184,6 +186,67 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging } } + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL + * url named table and connection properties. + * + * You can find the JDBC-specific option and parameter documentation for reading tables + * via JDBC in + * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";> + * Data Source Option in the version you use. + * + * @since 3.5.0 + */ + def jdbc(url: String, table: String, properties: Properties): DataFrame = { +// properties should override settings in extraOptions. +this.extraOptions ++= properties.asScala +// explicit url and dbtable should override all +this.extraOptions ++= Seq("url" -> url, "dbtable" -> table) +format("jdbc").load() + } + + // scalastyle:off line.size.limit + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL + * url named table. Partitions of the table will be retrieved in parallel based on the parameters + * passed to this function. + * + * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash + * your external database systems. + * + * You can find the JDBC-specific option and parameter documentation for reading tables via JDBC in + * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";> + * Data Source Option in the version you use. + * + * @param table Name of the table in the external database. + * @param columnName Alias of `partitionColumn` option. Refer to `partitionColumn` in + * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";> + * Data Source Option in the version you use. + * @param connectionProperties JDBC database connection arguments, a list of arbitrary string + * tag/value. Normally at least a "user" and "password" property + * should be included. "fetchsize" can be used to control the + * number of rows per fetch and "queryTimeout" can be used to wait + * for a Statement object to execute to the given number of seconds. + * @since 3.5.0 Review Comment: I am fine with marking this as 3.4.0. There is no harm in adding this to 3.4.0, if the RC does not pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
hvanhovell commented on PR #40252: URL: https://github.com/apache/spark/pull/40252#issuecomment-1452353820 @beliefer you can create a test in `PlanGenerationTestSuite`. That will at least validate the proto message we are generating, and it will validate that plan you are producing yields a valid plan in `ProtoToPlanTestSuite`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite`
hvanhovell commented on PR #40213: URL: https://github.com/apache/spark/pull/40213#issuecomment-1452355034 Alirght merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell closed pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite`
hvanhovell closed pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite` URL: https://github.com/apache/spark/pull/40213 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite`
zhenlineo commented on code in PR #40213: URL: https://github.com/apache/spark/pull/40213#discussion_r1123563539 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala: ## @@ -184,30 +218,36 @@ class CompatibilitySuite extends ConnectFunSuite { .filter { p => excludeRules.forall(rule => rule(p)) } - -if (problems.nonEmpty) { - fail( -s"\nComparing client jar: $clientJar\nand sql jar: $sqlJar\n" + - problems.map(p => p.description("client")).mkString("\n")) -} +problems } - test("compatibility API tests: Dataset") { -val clientClassLoader: URLClassLoader = new URLClassLoader(Seq(clientJar.toURI.toURL).toArray) -val sqlClassLoader: URLClassLoader = new URLClassLoader(Seq(sqlJar.toURI.toURL).toArray) + private def checkDatasetApiCompatibility(clientJar: File, sqlJar: File): Seq[String] = { Review Comment: This test was initially added because we did not cover the Dataset in the mima check. If we found such test is useful and want to expand to more classes we can run the mima check the other way around to ensure that our new code does not introduce more unwanted methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123572526 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -287,6 +288,28 @@ class SparkSession private[sql] ( client.execute(plan).asScala.foreach(_ => ()) } + /** + * Add a single artifact to the client session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = client.addArtifact(path) Review Comment: Can you mark these as experimental? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
amaliujia commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123575136 ## python/pyspark/sql/connect/plan.py: ## @@ -338,7 +338,7 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation: plan.local_relation.data = sink.getvalue().to_pybytes() if self._schema is not None: -plan.local_relation.schema = self._schema +plan.local_relation.schemaString = self._schema Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] peter-toth commented on a diff in pull request #40093: [SPARK-42500][SQL] ConstantPropagation supports more cases
peter-toth commented on code in PR #40093: URL: https://github.com/apache/spark/pull/40093#discussion_r1123582002 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala: ## @@ -200,14 +200,20 @@ object ConstantPropagation extends Rule[LogicalPlan] { private def replaceConstants(condition: Expression, equalityPredicates: EqualityPredicates) : Expression = { -val constantsMap = AttributeMap(equalityPredicates.map(_._1)) -val predicates = equalityPredicates.map(_._2).toSet -def replaceConstants0(expression: Expression) = expression transform { +val allConstantsMap = AttributeMap(equalityPredicates.map(_._1)) +val allPredicates = equalityPredicates.map(_._2).toSet +def replaceConstants0( +expression: Expression, constantsMap: AttributeMap[Literal]) = expression transform { case a: AttributeReference => constantsMap.getOrElse(a, a) } condition transform { - case e @ EqualTo(_, _) if !predicates.contains(e) => replaceConstants0(e) - case e @ EqualNullSafe(_, _) if !predicates.contains(e) => replaceConstants0(e) + case b: BinaryComparison => Review Comment: Let me open a PR tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123588842 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompl
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123590310 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompl
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123590597 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompl
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123591424 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompl
[GitHub] [spark] zhenlineo opened a new pull request, #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo opened a new pull request, #40257: URL: https://github.com/apache/spark/pull/40257 ### What changes were proposed in this pull request? Adding a simple script to start the Scala client in the Scala REPL. As well as a script to start the spark connect server for the client to connect to. ### Why are the changes needed? Make the JVM client more easy to be used by users. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123642955 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala: ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.InputStream +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.TimeUnit +import java.util.zip.{CheckedInputStream, CRC32} + +import com.google.protobuf.ByteString +import io.grpc.{ManagedChannel, Server} +import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsRequest +import org.apache.spark.sql.connect.client.util.ConnectFunSuite + +class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { + + private var client: SparkConnectClient = _ + private var service: DummySparkConnectService = _ + private var server: Server = _ + private var artifactManager: ArtifactManager = _ + private var channel: ManagedChannel = _ + + private def startDummyServer(): Unit = { +service = new DummySparkConnectService() +server = InProcessServerBuilder + .forName(getClass.getName) + .addService(service) + .build() +server.start() + } + + private def createArtifactManager(): Unit = { +channel = InProcessChannelBuilder.forName(getClass.getName).directExecutor().build() +artifactManager = new ArtifactManager(proto.UserContext.newBuilder().build(), channel) + } + + override def beforeEach(): Unit = { +super.beforeEach() +startDummyServer() +createArtifactManager() +client = null + } + + override def afterEach(): Unit = { +if (server != null) { + server.shutdownNow() + assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to shutdown") +} + +if (channel != null) { + channel.shutdownNow() +} + +if (client != null) { + client.shutdown() +} + } + + private val CHUNK_SIZE: Int = 32 * 1024 + protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests") + + /** + * Check if the data sent to the server (stored in `artifactChunk`) is equivalent to the local + * data at `localPath`. + * @param artifactChunk + * @param localPath + */ + private def assertFileDataEquality( + artifactChunk: AddArtifactsRequest.ArtifactChunk, + localPath: Path): Unit = { +val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32) +val localData = ByteString.readFrom(in) +assert(artifactChunk.getData == localData) +assert(artifactChunk.getCrc == in.getChecksum.getValue) + } + + private def singleChunkArtifactTest(path: String): Unit = { +test(s"Single Chunk Artifact - $path") { + val artifactPath = artifactFilePath.resolve(path) + artifactManager.addArtifact(artifactPath.toString) + + val receivedRequests = service.getAndClearLatestAddArtifactRequests() + // Single `AddArtifactRequest` + assert(receivedRequests.size == 1) + + val request = receivedRequests.head + assert(request.hasBatch) + + val batch = request.getBatch + // Single artifact in batch + assert(batch.getArtifactsList.size() == 1) + + val singleChunkArtifact = batch.getArtifacts(0) + val namePrefix = artifactPath.getFileName.toString match { +case jar if jar.endsWith(".jar") => "jars" +case cf if cf.endsWith(".class") => "classes" + } + assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path)) + assertFileDataEquality(singleChunkArtifact.getData, artifactPath) +} + } + + singleChunkArtifactTest("smallClassFile.class") + + singleChunkArtifactTest("smallJar.jar") + + private def readNextChunk(in: InputStream): ByteString = { +val buf = new Array[Byte](CHUNK_SIZE) +var bytesRead = 0 +var count = 0 +while (count != -1 && bytesRead < CHUNK_SIZE) { + count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead) + if (count != -1) { +bytesRead += count + } +} +
[GitHub] [spark] shrprasa opened a new pull request, #40258: [WIP][SPARK-42655]:Incorrect ambiguous column reference error
shrprasa opened a new pull request, #40258: URL: https://github.com/apache/spark/pull/40258 Incorrect ambiguous column reference error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123653086 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" +val sessionBuilder = SparkSession.builder() +val spark = if (conStr.isEmpty) sessionBuilder.build() else sessionBuilder.remote(conStr).build() Review Comment: Do we default to a local connection if none is set? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123653670 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark submit +#build/sbt package + +# Build the jars needed for spark connect +build/sbt "connect/assembly" + +CONNECT_JAR=`ls "${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar | paste -sd ',' -` + +exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR" Review Comment: We have two scripts for starting and stopping a connect server. Should we use those instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123654587 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash Review Comment: Since these scripts are aimed at development, maybe put them in connect/connect/bin? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123653670 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark submit +#build/sbt package + +# Build the jars needed for spark connect +build/sbt "connect/assembly" + +CONNECT_JAR=`ls "${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar | paste -sd ',' -` + +exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR" Review Comment: We have two scripts for starting and stopping a connect server. Should we use those instead? If this is for dev, then we should keep these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123660107 ## bin/spark-connect-scala-client: ## @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" + +CONNECT_CLASSPATH="$(build/sbt -DcopyDependencies=false "export connect-client-jvm/fullClasspath" | grep jar | tail -n1)" +SQL_CLASSPATH="$(build/sbt -DcopyDependencies=false "export sql/fullClasspath" | grep jar | tail -n1)" + +INIT_SCRIPT="${SPARK_HOME}"/bin/spark-connect-scala-client.sc +"${SPARK_HOME}"/build/scala*/bin/scala -cp "$CONNECT_CLASSPATH:$SQL_CLASSPATH" -i $INIT_SCRIPT Review Comment: TIL you can do this :)... I have multiple scala versions floating around in my spark dir. We should pick the current one... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123661416 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark submit +#build/sbt package Review Comment: Remove comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite
hvanhovell commented on PR #40241: URL: https://github.com/apache/spark/pull/40241#issuecomment-1452528841 Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell closed pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite
hvanhovell closed pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite URL: https://github.com/apache/spark/pull/40241 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123688156 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" Review Comment: Feature request, can we try to configure this using a `--connectionString` argument? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123695202 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" +val sessionBuilder = SparkSession.builder() +val spark = if (conStr.isEmpty) sessionBuilder.build() else sessionBuilder.remote(conStr).build() Review Comment: Oh BTW. there does not seem to be SparkSession.Builder.remote() function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123696037 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" +val sessionBuilder = SparkSession.builder() +val spark = if (conStr.isEmpty) sessionBuilder.build() else sessionBuilder.remote(conStr).build() Review Comment: Can you also set the user agent to something meaningful? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123697756 ## bin/spark-connect-scala-client: ## @@ -0,0 +1,26 @@ +#!/usr/bin/env bash Review Comment: Script files need to have a license as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
hvanhovell commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123698953 ## connector/connect/common/src/main/protobuf/spark/connect/relations.proto: ## @@ -353,11 +353,16 @@ message LocalRelation { optional bytes data = 1; // (Optional) The schema of local data. - // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. - optional string schema = 2; + oneof schema { +// Either a DDL-formatted type string or a JSON string. +string schemaString = 2; Review Comment: thanks; it is been a while since I touched proto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia opened a new pull request, #40259: [SPARK-42609][CONNECT] Add tests for grouping() and grouping_id() functions
amaliujia opened a new pull request, #40259: URL: https://github.com/apache/spark/pull/40259 ### What changes were proposed in this pull request? Add tests for grouping() and grouping_id() functions. ### Why are the changes needed? Improve testing coverage. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #40259: [SPARK-42609][CONNECT] Add tests for grouping() and grouping_id() functions
amaliujia commented on PR #40259: URL: https://github.com/apache/spark/pull/40259#issuecomment-1452625458 @hvanhovell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123818828 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark submit +#build/sbt package + +# Build the jars needed for spark connect +build/sbt "connect/assembly" + +CONNECT_JAR=`ls "${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar | paste -sd ',' -` + +exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR" Review Comment: Where is the other script? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123821210 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onComple
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123821569 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onComple
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123829338 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onComple
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123833306 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onComple
[GitHub] [spark] ueshin opened a new pull request, #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
ueshin opened a new pull request, #40260: URL: https://github.com/apache/spark/pull/40260 ### What changes were proposed in this pull request? Delays parsing DDL string for Python UDFs until `SparkConnectClient` is available. Also changes `createDataFrame` to use the proto `DDLParse`. ### Why are the changes needed? Currently `parse_data_type` depends on `PySparkSession` that creates a local PySpark, but it won't be available in the client side. When `SparkConnectClient` is available, we can use the new proto `DDLParse` to parse the data types as string. ### Does this PR introduce _any_ user-facing change? The UDF's `returnType` attribute could be a string in Spark Connect if it is provided as string. ### How was this patch tested? Existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40210: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`
ueshin commented on code in PR #40210: URL: https://github.com/apache/spark/pull/40210#discussion_r1123891426 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala: ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import scala.collection.JavaConverters._ + +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput} +import org.apache.spark.sql.connect.planner.SparkConnectPlanner +import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, FormattedMode, SimpleMode} + +private[connect] class SparkConnectAnalyzeHandler( +responseObserver: StreamObserver[proto.AnalyzePlanResponse]) +extends Logging { + + def handle(request: proto.AnalyzePlanRequest): Unit = { +val session = + SparkConnectService +.getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId) +.session + +val response = process(request, session) +responseObserver.onNext(response) +responseObserver.onCompleted() Review Comment: I guess we should surround this with `session.withActive`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm closed pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
mridulm closed pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache URL: https://github.com/apache/spark/pull/39459 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
mridulm commented on PR #39459: URL: https://github.com/apache/spark/pull/39459#issuecomment-1452754371 Merged to master. Thanks for fixing this @ivoson ! Thanks for the reviews @Ngone51 :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123897616 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala: ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.InputStream +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.TimeUnit +import java.util.zip.{CheckedInputStream, CRC32} + +import com.google.protobuf.ByteString +import io.grpc.{ManagedChannel, Server} +import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsRequest +import org.apache.spark.sql.connect.client.util.ConnectFunSuite + +class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { + + private var client: SparkConnectClient = _ + private var service: DummySparkConnectService = _ + private var server: Server = _ + private var artifactManager: ArtifactManager = _ + private var channel: ManagedChannel = _ + + private def startDummyServer(): Unit = { +service = new DummySparkConnectService() +server = InProcessServerBuilder + .forName(getClass.getName) + .addService(service) + .build() +server.start() + } + + private def createArtifactManager(): Unit = { +channel = InProcessChannelBuilder.forName(getClass.getName).directExecutor().build() +artifactManager = new ArtifactManager(proto.UserContext.newBuilder().build(), channel) + } + + override def beforeEach(): Unit = { +super.beforeEach() +startDummyServer() +createArtifactManager() +client = null + } + + override def afterEach(): Unit = { +if (server != null) { + server.shutdownNow() + assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to shutdown") +} + +if (channel != null) { + channel.shutdownNow() +} + +if (client != null) { + client.shutdown() +} + } + + private val CHUNK_SIZE: Int = 32 * 1024 + protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests") + + /** + * Check if the data sent to the server (stored in `artifactChunk`) is equivalent to the local + * data at `localPath`. + * @param artifactChunk + * @param localPath + */ + private def assertFileDataEquality( + artifactChunk: AddArtifactsRequest.ArtifactChunk, + localPath: Path): Unit = { +val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32) +val localData = ByteString.readFrom(in) +assert(artifactChunk.getData == localData) +assert(artifactChunk.getCrc == in.getChecksum.getValue) + } + + private def singleChunkArtifactTest(path: String): Unit = { +test(s"Single Chunk Artifact - $path") { + val artifactPath = artifactFilePath.resolve(path) + artifactManager.addArtifact(artifactPath.toString) + + val receivedRequests = service.getAndClearLatestAddArtifactRequests() + // Single `AddArtifactRequest` + assert(receivedRequests.size == 1) + + val request = receivedRequests.head + assert(request.hasBatch) + + val batch = request.getBatch + // Single artifact in batch + assert(batch.getArtifactsList.size() == 1) + + val singleChunkArtifact = batch.getArtifacts(0) + val namePrefix = artifactPath.getFileName.toString match { +case jar if jar.endsWith(".jar") => "jars" +case cf if cf.endsWith(".class") => "classes" + } + assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path)) + assertFileDataEquality(singleChunkArtifact.getData, artifactPath) +} + } + + singleChunkArtifactTest("smallClassFile.class") + + singleChunkArtifactTest("smallJar.jar") + + private def readNextChunk(in: InputStream): ByteString = { +val buf = new Array[Byte](CHUNK_SIZE) +var bytesRead = 0 +var count = 0 +while (count != -1 && bytesRead < CHUNK_SIZE) { + count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead) + if (count != -1) { +bytesRead += count + } +} +
[GitHub] [spark] ueshin opened a new pull request, #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive
ueshin opened a new pull request, #40261: URL: https://github.com/apache/spark/pull/40261 ### What changes were proposed in this pull request? Fix `SparkConnectAnalyzeHandler` to use `withActive`. ### Why are the changes needed? Similar to #40165, `SQLConf.get` is necessary when transforming the proto to plans. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40210: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`
ueshin commented on code in PR #40210: URL: https://github.com/apache/spark/pull/40210#discussion_r1123898851 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala: ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import scala.collection.JavaConverters._ + +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput} +import org.apache.spark.sql.connect.planner.SparkConnectPlanner +import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, FormattedMode, SimpleMode} + +private[connect] class SparkConnectAnalyzeHandler( +responseObserver: StreamObserver[proto.AnalyzePlanResponse]) +extends Logging { + + def handle(request: proto.AnalyzePlanRequest): Unit = { +val session = + SparkConnectService +.getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId) +.session + +val response = process(request, session) +responseObserver.onNext(response) +responseObserver.onCompleted() Review Comment: #40261 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
beliefer commented on PR #40252: URL: https://github.com/apache/spark/pull/40252#issuecomment-1452763000 > I guess you can refer to `JDBCSuite` and `ClientE2ETestSuite` ? The built-in H2 running in server side and we can't start H2 database at connect API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123910664 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" +val sessionBuilder = SparkSession.builder() +val spark = if (conStr.isEmpty) sessionBuilder.build() else sessionBuilder.remote(conStr).build() Review Comment: The user agent if not set, defaults to "_SPARK_CONNECT_SCALA". Do you want another name for the script client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123911014 ## bin/spark-connect-scala-client: ## @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" + +CONNECT_CLASSPATH="$(build/sbt -DcopyDependencies=false "export connect-client-jvm/fullClasspath" | grep jar | tail -n1)" +SQL_CLASSPATH="$(build/sbt -DcopyDependencies=false "export sql/fullClasspath" | grep jar | tail -n1)" + +INIT_SCRIPT="${SPARK_HOME}"/bin/spark-connect-scala-client.sc +"${SPARK_HOME}"/build/scala*/bin/scala -cp "$CONNECT_CLASSPATH:$SQL_CLASSPATH" -i $INIT_SCRIPT Review Comment: Now it will find the version set in the top pom file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123913129 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" Review Comment: This feature request is a bit hard :D I cannot figure out a way to pass the args to the program rather than scala repl. I think it relates '-i'. We need '-i' to keep the repl running, then the repl thinks all args are to the repl rather than the scala script. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on PR #39459: URL: https://github.com/apache/spark/pull/39459#issuecomment-1452774796 Thanks for the review and help on this. @mridulm @Ngone51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
ueshin commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123920584 ## connector/connect/common/src/main/protobuf/spark/connect/relations.proto: ## @@ -353,11 +353,16 @@ message LocalRelation { optional bytes data = 1; // (Optional) The schema of local data. - // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. - optional string schema = 2; + oneof schema { Review Comment: In some cases, we need to parse it beforehand anyway, for the case of #40240, we can't create converter from the local python object to Arrow table without the schema as `DataType` object. Another example is UDFs. It needs to pickle the function, and its return type as `DataType` object. We can't generate Python `DataType` object and pickle it in the `command` field in server side anymore. So we need to parse the DDL string beforehand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
ueshin commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123923286 ## connector/connect/common/src/main/protobuf/spark/connect/relations.proto: ## @@ -353,11 +353,16 @@ message LocalRelation { optional bytes data = 1; // (Optional) The schema of local data. - // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. - optional string schema = 2; + oneof schema { Review Comment: I think this change is fine. I'd rather like to change all places to accept both string and `DataType` and the choice is up to the client developers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen closed pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types
srowen closed pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types URL: https://github.com/apache/spark/pull/40220 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org