[GitHub] [spark] LuciferYang opened a new pull request, #40248: [SPARK-42648][BUILD] Upgrade `versions-maven-plugin` to 2.15.0

2023-03-02 Thread via GitHub


LuciferYang opened a new pull request, #40248:
URL: https://github.com/apache/spark/pull/40248

   ### What changes were proposed in this pull request?
   This pr aims upgrade `versions-maven-plugin` to 2.15.0
   
   
   ### Why are the changes needed?
   New version bring some improvements
   - https://github.com/mojohaus/versions/pull/915  
   - https://github.com/mojohaus/versions/pull/881
   - https://github.com/mojohaus/versions/pull/912
   - https://github.com/mojohaus/versions/pull/898
   - https://github.com/mojohaus/versions/pull/889
   - https://github.com/mojohaus/versions/pull/883
   - https://github.com/mojohaus/versions/pull/886
   - https://github.com/mojohaus/versions/pull/878
   - https://github.com/mojohaus/versions/pull/893
   
   and some bug fix:
   - https://github.com/mojohaus/versions/pull/907
   - https://github.com/mojohaus/versions/pull/901
   - https://github.com/mojohaus/versions/pull/897
   - https://github.com/mojohaus/versions/pull/891
   
   
   The full release notes as follows:
   - https://github.com/apache/spark/pull/39784
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   
   - GA `Dependencies test` should work normally
   - Manually check `./dev/test-dependencies.sh --replace-manifest`, run 
successful
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun opened a new pull request, #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files

2023-03-02 Thread via GitHub


dongjoon-hyun opened a new pull request, #40249:
URL: https://github.com/apache/spark/pull/40249

   …
   
   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files

2023-03-02 Thread via GitHub


dongjoon-hyun commented on PR #40249:
URL: https://github.com/apache/spark/pull/40249#issuecomment-1451470960

   cc @srowen and @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allanf-db opened a new pull request, #40250: [WIP][SPARK-42642][DOCS][PYTHON] Updating remaining Spark documentation code examples to show Python by default

2023-03-02 Thread via GitHub


allanf-db opened a new pull request, #40250:
URL: https://github.com/apache/spark/pull/40250

   
   
   ### What changes were proposed in this pull request?
   Making Python the first tab for code examples in the Spark documentation.
   
   
   
   ### Why are the changes needed?
   This completes the work started with [SPARK-42493].
   Python is the most approachable and most popular language and this change 
moves Python code examples to the first tab (showing by default).
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, the user facing Spark documentation is updated.
   
   
   
   ### How was this patch tested?
   I built the website locally and manually tested the pages.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


ivoson commented on PR #39459:
URL: https://github.com/apache/spark/pull/39459#issuecomment-1451486114

   > @ivoson Did you get a chance to address the pending comments above ? 
([here](https://github.com/apache/spark/pull/39459#discussion_r1117995723) and 
[here](https://github.com/apache/spark/pull/39459#discussion_r1117996417))
   
   Thanks @mridulm for reminding. Missed these 2...
   
   Updated, please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1122750441


##
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala:
##
@@ -399,7 +426,14 @@ private[storage] class BlockInfoManager extends Logging {
 try {
   val wrapper = new BlockInfoWrapper(newBlockInfo, lock)
   while (true) {
-val previous = blockInfoWrappers.putIfAbsent(blockId, wrapper)
+val previous = invisibleRDDBlocks.synchronized {

Review Comment:
   done, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1122750718


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -1325,31 +1328,74 @@ private[spark] class BlockManager(
 blockInfoManager.releaseAllLocksForTask(taskAttemptId)
   }
 
+  /**
+   * Retrieve the given rdd block if it exists and is visible, otherwise call 
the provided
+   * `makeIterator` method to compute the block, persist it, and return its 
values.
+   *
+   * @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
+   * could not be cached.
+   */
+  def getOrElseUpdateRDDBlock[T](
+  taskId: Long,
+  blockId: RDDBlockId,
+  level: StorageLevel,
+  classTag: ClassTag[T],
+  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
+val isCacheVisible = isRDDBlockVisible(blockId)
+val res = getOrElseUpdate(blockId, level, classTag, makeIterator, 
isCacheVisible)
+if (res.isLeft && !isCacheVisible) {
+  // Block exists but not visible, report taskId -> blockId info to master.
+  master.updateRDDBlockTaskInfo(blockId, taskId)
+}
+
+res
+  }
+
   /**
* Retrieve the given block if it exists, otherwise call the provided 
`makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an 
iterator if the block
* could not be cached.
*/
-  def getOrElseUpdate[T](
+  private def getOrElseUpdate[T](
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[T],
-  makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
-// Attempt to read the block from local or remote storage. If it's 
present, then we don't need
-// to go through the local-get-or-put path.
-get[T](blockId)(classTag) match {
-  case Some(block) =>
-return Left(block)
-  case _ =>
-// Need to compute the block.
+  makeIterator: () => Iterator[T],
+  isCacheVisible: Boolean = true): Either[BlockResult, Iterator[T]] = {

Review Comment:
   done, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a diff in pull request #40190: [SPARK-42597][SQL] UnwrapCastInBinaryComparison support unwrap timestamp type

2023-03-02 Thread via GitHub


wangyum commented on code in PR #40190:
URL: https://github.com/apache/spark/pull/40190#discussion_r1122758398


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala:
##
@@ -293,6 +299,34 @@ object UnwrapCastInBinaryComparison extends 
Rule[LogicalPlan] {
 }
   }
 
+  /**
+   * Move the cast to the literal side, because we can only get the minimum 
value of timestamp,
+   * so some BinaryComparison needs to be changed,
+   * such as CAST(ts AS date) > DATE '2023-01-01' ===> ts >= TIMESTAMP 
'2023-01-02 00:00:00'

Review Comment:
   Yes, it should be `'2023-01-02 00:00:00' > '2023-01-01 23:59:59.99'` if 
do not change `>` to `>=`. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files

2023-03-02 Thread via GitHub


dongjoon-hyun commented on PR #40249:
URL: https://github.com/apache/spark/pull/40249#issuecomment-1451524437

   Thank you, @HyukjinKwon and @LuciferYang .
   
   The license test passed. Merged to master/3.4/3.3/3.2.
   ![Screenshot 2023-03-02 at 1 02 02 
AM](https://user-images.githubusercontent.com/9700541/222381571-afcd08bf-0503-4250-8085-c2c824f570a5.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #40249: [SPARK-42649][CORE] Remove the standard Apache License header from the top of third-party source files

2023-03-02 Thread via GitHub


dongjoon-hyun closed pull request #40249: [SPARK-42649][CORE] Remove the 
standard Apache License header from the top of third-party source files
URL: https://github.com/apache/spark/pull/40249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #40250: [SPARK-42642][DOCS][PYTHON] Updating remaining Spark documentation code examples to show Python by default

2023-03-02 Thread via GitHub


HyukjinKwon commented on PR #40250:
URL: https://github.com/apache/spark/pull/40250#issuecomment-1451560563

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #40250: [SPARK-42642][DOCS][PYTHON] Updating remaining Spark documentation code examples to show Python by default

2023-03-02 Thread via GitHub


HyukjinKwon closed pull request #40250: [SPARK-42642][DOCS][PYTHON] Updating 
remaining Spark documentation code examples to show Python by default
URL: https://github.com/apache/spark/pull/40250


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a diff in pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.

2023-03-02 Thread via GitHub


MaxGekk commented on code in PR #40237:
URL: https://github.com/apache/spark/pull/40237#discussion_r1122804677


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala:
##
@@ -1961,6 +1961,99 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 }
   }
 
+  test("SPARK-42635: timestampadd near daylight saving transition") {
+// In America/Los_Angeles timezone, timestamp value `skippedTime` is 
2011-03-13 03:00:00.
+// The next second of 2011-03-13 01:59:59 jumps to 2011-03-13 03:00:00.
+val skippedTime = 13000104L
+// In America/Los_Angeles timezone, both timestamp range `[repeatedTime - 
MICROS_PER_HOUR,
+// repeatedTime)` and `[repeatedTime, repeatedTime + MICROS_PER_HOUR)` map 
to
+// [2011-11-06 01:00:00, 2011-11-06 02:00:00).
+// The next second of 2011-11-06 01:59:59 (pre-transition) jumps back to 
2011-11-06 01:00:00.
+val repeatedTime = 13205700L
+withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId) {
+  // Adding one day is **not** equivalent to adding _PER_DAY time 
units, because not every
+  // day has 24 hours: 2011-03-13 has 23 hours, 2011-11-06 has 25 hours.
+
+  // timestampadd(DAY, 1, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
+  checkEvaluation(
+TimestampAdd("DAY", Literal(1), Literal(skippedTime - 23 * 
MICROS_PER_HOUR, TimestampType)),
+skippedTime)
+  // timestampadd(HOUR, 24, 2011-03-12 03:00:00) = 2011-03-13 04:00:00
+  checkEvaluation(
+TimestampAdd("HOUR", Literal(24),
+  Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)),
+skippedTime + MICROS_PER_HOUR)
+  // timestampadd(HOUR, 23, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
+  checkEvaluation(
+TimestampAdd("HOUR", Literal(23),
+  Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)),
+skippedTime)
+  // timestampadd(SECOND, SECONDS_PER_DAY, 2011-03-12 03:00:00) = 
2011-03-13 04:00:00
+  checkEvaluation(
+TimestampAdd(
+  "SECOND", Literal(SECONDS_PER_DAY.toInt),
+  Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)),
+skippedTime + MICROS_PER_HOUR)
+  // timestampadd(SECOND, SECONDS_PER_DAY, 2011-03-12 03:00:00) = 
2011-03-13 03:59:59
+  checkEvaluation(
+TimestampAdd(
+  "SECOND", Literal(SECONDS_PER_DAY.toInt - 1),
+  Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)),
+skippedTime + MICROS_PER_HOUR - MICROS_PER_SECOND)
+
+  // timestampadd(DAY, 1, 2011-11-05 02:00:00) = 2011-11-06 02:00:00
+  checkEvaluation(
+TimestampAdd("DAY", Literal(1),
+  Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime + MICROS_PER_HOUR)
+  // timestampadd(DAY, 1, 2011-11-05 01:00:00) = 2011-11-06 01:00:00 
(pre-transition)
+  checkEvaluation(
+TimestampAdd("DAY", Literal(1),
+  Literal(repeatedTime - 25 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime - MICROS_PER_HOUR)
+  // timestampadd(DAY, -1, 2011-11-07 01:00:00) = 2011-11-06 01:00:00 
(post-transition)
+  checkEvaluation(
+TimestampAdd("DAY", Literal(-1),
+  Literal(repeatedTime + 24 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime)
+  // timestampadd(MONTH, 1, 2011-10-06 01:00:00) = 2011-11-06 01:00:00 
(pre-transition)
+  checkEvaluation(
+TimestampAdd(
+  "MONTH", Literal(1),
+  Literal(repeatedTime - MICROS_PER_HOUR - 31 * MICROS_PER_DAY, 
TimestampType)),
+repeatedTime - MICROS_PER_HOUR)
+  // timestampadd(MONTH, -1, 2011-12-06 01:00:00) = 2011-11-06 01:00:00 
(post-transition)
+  checkEvaluation(
+TimestampAdd(
+  "MONTH", Literal(-1),
+  Literal(repeatedTime + 30 * MICROS_PER_DAY, TimestampType)),
+repeatedTime)
+  // timestampadd(HOUR, 23, 2011-11-05 02:00:00) = 2011-11-06 01:00:00 
(pre-transition)
+  checkEvaluation(
+TimestampAdd("HOUR", Literal(23),
+  Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime - MICROS_PER_HOUR)
+  // timestampadd(HOUR, 24, 2011-11-05 02:00:00) = 2011-11-06 01:00:00 
(post-transition)
+  checkEvaluation(
+TimestampAdd("HOUR", Literal(24),
+  Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime)
+}
+  }
+
+  test("SPARK-42635: timestampadd unit conversion overflow") {
+withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+  checkExceptionInExpression[SparkArithmeticException](TimestampAdd("DAY",

Review Comment:
   Could you use `checkErrorInExpression` instead of 
`checkExceptionInExpression`, please. This will allow to avoid the dependency 
of error message, so, tech editors will be able to modify `error-classes.json` 
w/o the modifying Spark tests.



##
sql/cataly

[GitHub] [spark] MaxGekk commented on a diff in pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.

2023-03-02 Thread via GitHub


MaxGekk commented on code in PR #40237:
URL: https://github.com/apache/spark/pull/40237#discussion_r1122804677


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala:
##
@@ -1961,6 +1961,99 @@ class DateExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 }
   }
 
+  test("SPARK-42635: timestampadd near daylight saving transition") {
+// In America/Los_Angeles timezone, timestamp value `skippedTime` is 
2011-03-13 03:00:00.
+// The next second of 2011-03-13 01:59:59 jumps to 2011-03-13 03:00:00.
+val skippedTime = 13000104L
+// In America/Los_Angeles timezone, both timestamp range `[repeatedTime - 
MICROS_PER_HOUR,
+// repeatedTime)` and `[repeatedTime, repeatedTime + MICROS_PER_HOUR)` map 
to
+// [2011-11-06 01:00:00, 2011-11-06 02:00:00).
+// The next second of 2011-11-06 01:59:59 (pre-transition) jumps back to 
2011-11-06 01:00:00.
+val repeatedTime = 13205700L
+withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId) {
+  // Adding one day is **not** equivalent to adding _PER_DAY time 
units, because not every
+  // day has 24 hours: 2011-03-13 has 23 hours, 2011-11-06 has 25 hours.
+
+  // timestampadd(DAY, 1, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
+  checkEvaluation(
+TimestampAdd("DAY", Literal(1), Literal(skippedTime - 23 * 
MICROS_PER_HOUR, TimestampType)),
+skippedTime)
+  // timestampadd(HOUR, 24, 2011-03-12 03:00:00) = 2011-03-13 04:00:00
+  checkEvaluation(
+TimestampAdd("HOUR", Literal(24),
+  Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)),
+skippedTime + MICROS_PER_HOUR)
+  // timestampadd(HOUR, 23, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
+  checkEvaluation(
+TimestampAdd("HOUR", Literal(23),
+  Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)),
+skippedTime)
+  // timestampadd(SECOND, SECONDS_PER_DAY, 2011-03-12 03:00:00) = 
2011-03-13 04:00:00
+  checkEvaluation(
+TimestampAdd(
+  "SECOND", Literal(SECONDS_PER_DAY.toInt),
+  Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)),
+skippedTime + MICROS_PER_HOUR)
+  // timestampadd(SECOND, SECONDS_PER_DAY, 2011-03-12 03:00:00) = 
2011-03-13 03:59:59
+  checkEvaluation(
+TimestampAdd(
+  "SECOND", Literal(SECONDS_PER_DAY.toInt - 1),
+  Literal(skippedTime - 23 * MICROS_PER_HOUR, TimestampType)),
+skippedTime + MICROS_PER_HOUR - MICROS_PER_SECOND)
+
+  // timestampadd(DAY, 1, 2011-11-05 02:00:00) = 2011-11-06 02:00:00
+  checkEvaluation(
+TimestampAdd("DAY", Literal(1),
+  Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime + MICROS_PER_HOUR)
+  // timestampadd(DAY, 1, 2011-11-05 01:00:00) = 2011-11-06 01:00:00 
(pre-transition)
+  checkEvaluation(
+TimestampAdd("DAY", Literal(1),
+  Literal(repeatedTime - 25 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime - MICROS_PER_HOUR)
+  // timestampadd(DAY, -1, 2011-11-07 01:00:00) = 2011-11-06 01:00:00 
(post-transition)
+  checkEvaluation(
+TimestampAdd("DAY", Literal(-1),
+  Literal(repeatedTime + 24 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime)
+  // timestampadd(MONTH, 1, 2011-10-06 01:00:00) = 2011-11-06 01:00:00 
(pre-transition)
+  checkEvaluation(
+TimestampAdd(
+  "MONTH", Literal(1),
+  Literal(repeatedTime - MICROS_PER_HOUR - 31 * MICROS_PER_DAY, 
TimestampType)),
+repeatedTime - MICROS_PER_HOUR)
+  // timestampadd(MONTH, -1, 2011-12-06 01:00:00) = 2011-11-06 01:00:00 
(post-transition)
+  checkEvaluation(
+TimestampAdd(
+  "MONTH", Literal(-1),
+  Literal(repeatedTime + 30 * MICROS_PER_DAY, TimestampType)),
+repeatedTime)
+  // timestampadd(HOUR, 23, 2011-11-05 02:00:00) = 2011-11-06 01:00:00 
(pre-transition)
+  checkEvaluation(
+TimestampAdd("HOUR", Literal(23),
+  Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime - MICROS_PER_HOUR)
+  // timestampadd(HOUR, 24, 2011-11-05 02:00:00) = 2011-11-06 01:00:00 
(post-transition)
+  checkEvaluation(
+TimestampAdd("HOUR", Literal(24),
+  Literal(repeatedTime - 24 * MICROS_PER_HOUR, TimestampType)),
+repeatedTime)
+}
+  }
+
+  test("SPARK-42635: timestampadd unit conversion overflow") {
+withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") {
+  checkExceptionInExpression[SparkArithmeticException](TimestampAdd("DAY",

Review Comment:
   Could you use `checkErrorInExpression` instead of 
`checkExceptionInExpression`, please. This will allow to avoid the dependency 
of error message, so, tech editors will be able to modify `error-classes.json` 
w/o modifying Spark tests.



-- 
This is an automated 

[GitHub] [spark] zhengruifeng closed pull request #40243: [SPARK-42641][CONNECT][BUILD] Upgrade buf from 1.14.0 to 1.15.0

2023-03-02 Thread via GitHub


zhengruifeng closed pull request #40243: [SPARK-42641][CONNECT][BUILD] Upgrade 
buf from 1.14.0 to 1.15.0
URL: https://github.com/apache/spark/pull/40243


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40243: [SPARK-42641][CONNECT][BUILD] Upgrade buf from 1.14.0 to 1.15.0

2023-03-02 Thread via GitHub


zhengruifeng commented on PR #40243:
URL: https://github.com/apache/spark/pull/40243#issuecomment-1451586124

   merged into master/3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon opened a new pull request, #40251: [SPARK-41725][PYTHON][TESTS][FOLLOW-UP] Remove collect in SQL command execution in tests

2023-03-02 Thread via GitHub


HyukjinKwon opened a new pull request, #40251:
URL: https://github.com/apache/spark/pull/40251

   ### What changes were proposed in this pull request?
   
   This PR removes `sql("command").collect()` workaround in PySpark tests codes.
   
   ### Why are the changes needed?
   
   They were added previously to work around within Spark Connect. This is 
fixed now, so we don't need to call `collect` anymore.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, test-only.
   
   ### How was this patch tested?
   
   CI in this PR should test it out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #40251: [SPARK-41725][PYTHON][TESTS][FOLLOW-UP] Remove collect for SQL command execution in tests

2023-03-02 Thread via GitHub


HyukjinKwon commented on PR #40251:
URL: https://github.com/apache/spark/pull/40251#issuecomment-1451727734

   All related tests passed.
   
   Merged to master and branch-3.4.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #40251: [SPARK-41725][PYTHON][TESTS][FOLLOW-UP] Remove collect for SQL command execution in tests

2023-03-02 Thread via GitHub


HyukjinKwon closed pull request #40251: [SPARK-41725][PYTHON][TESTS][FOLLOW-UP] 
Remove collect for SQL command execution in tests
URL: https://github.com/apache/spark/pull/40251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun commented on a diff in pull request #40217: [SPARK-42559][CONNECT] Implement DataFrameNaFunctions

2023-03-02 Thread via GitHub


panbingkun commented on code in PR #40217:
URL: https://github.com/apache/spark/pull/40217#discussion_r1122990355


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala:
##
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl}
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.Expression.{Literal => GLiteral}
+import org.apache.spark.connect.proto.{NAReplace, Relation}
+import org.apache.spark.connect.proto.NAReplace.Replacement
+
+/**
+ * Functionality for working with missing data in `DataFrame`s.
+ *
+ * @since 3.4.0
+ */
+final class DataFrameNaFunctions private[sql](sparkSession: SparkSession, 
root: Relation) {
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing any null or NaN 
values.
+   *
+   * @since 3.4.0
+   */
+  def drop(): DataFrame = {
+sparkSession.newDataFrame { builder =>
+  builder.getDropNaBuilder.setInput(root)
+}
+  }
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing null or NaN values.
+   *
+   * If `how` is "any", then drop rows containing any null or NaN values.
+   * If `how` is "all", then drop rows only if every column is null or NaN for 
that row.
+   *
+   * @since 3.4.0
+   */
+  def drop(how: String): DataFrame = {
+sparkSession.newDataFrame { builder =>
+  val dropNaBuilder = builder.getDropNaBuilder.setInput(root)
+  how.toLowerCase(Locale.ROOT) match {

Review Comment:
   Done



##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala:
##
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl}
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.Expression.{Literal => GLiteral}
+import org.apache.spark.connect.proto.{NAReplace, Relation}
+import org.apache.spark.connect.proto.NAReplace.Replacement
+
+/**
+ * Functionality for working with missing data in `DataFrame`s.
+ *
+ * @since 3.4.0
+ */
+final class DataFrameNaFunctions private[sql](sparkSession: SparkSession, 
root: Relation) {
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing any null or NaN 
values.
+   *
+   * @since 3.4.0
+   */
+  def drop(): DataFrame = {
+sparkSession.newDataFrame { builder =>

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer opened a new pull request, #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


beliefer opened a new pull request, #40252:
URL: https://github.com/apache/spark/pull/40252

   ### What changes were proposed in this pull request?
   Currently, the connect project have the new `DataFrameReader` API which is 
corresponding to Spark `DataFrameReader` API. But the connect `DataFrameReader` 
missing the jdbc API.
   
   
   ### Why are the changes needed?
   This PR try to add JDBC to `DataFrameReader`
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   New feature.
   
   
   ### How was this patch tested?
   N/A
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


beliefer commented on PR #40252:
URL: https://github.com/apache/spark/pull/40252#issuecomment-1451774998

   The jdbc API seems hard to test, do we need a test case? @hvanhovell 
@HyukjinKwon @zhengruifeng @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


beliefer commented on PR #40252:
URL: https://github.com/apache/spark/pull/40252#issuecomment-1451778555

   There is another kind jdbc API, see: 
https://github.com/apache/spark/blob/79da1ab400f25dbceec45e107e5366d084138fa8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L316
   I will create another PR to add proto msg and implement it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40218: [SPARK-42579][CONNECT] Part-1: `function.lit` support `Array[_]` dataType

2023-03-02 Thread via GitHub


LuciferYang commented on code in PR #40218:
URL: https://github.com/apache/spark/pull/40218#discussion_r1123020720


##
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto:
##
@@ -189,6 +190,11 @@ message Expression {
   int32 days = 2;
   int64 microseconds = 3;
 }
+
+message Array {
+  DataType elementType = 1;
+  repeated Literal element = 2;

Review Comment:
   should be `element` or `elements`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yabola commented on a diff in pull request #39950: [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader

2023-03-02 Thread via GitHub


yabola commented on code in PR #39950:
URL: https://github.com/apache/spark/pull/39950#discussion_r1121979988


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:
##
@@ -296,41 +309,45 @@ class ParquetFileFormat
 throw e
 }
   } else {
-logDebug(s"Falling back to parquet-mr")
-// ParquetRecordReader returns InternalRow
-val readSupport = new ParquetReadSupport(
-  convertTz,
-  enableVectorizedReader = false,
-  datetimeRebaseSpec,
-  int96RebaseSpec)
-val reader = if (pushed.isDefined && enableRecordFilter) {
-  val parquetFilter = FilterCompat.get(pushed.get, null)
-  new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
-} else {
-  new ParquetRecordReader[InternalRow](readSupport)
-}
-val readerWithRowIndexes = 
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
-requiredSchema)
-val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
 try {
-  readerWithRowIndexes.initialize(split, hadoopAttemptContext)
-
-  val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
-  val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
-
-  if (partitionSchema.length == 0) {
-// There is no partition columns
-iter.map(unsafeProjection)
+  logDebug(s"Falling back to parquet-mr")
+  // ParquetRecordReader returns InternalRow
+  val readSupport = new ParquetReadSupport(
+convertTz,
+enableVectorizedReader = false,
+datetimeRebaseSpec,
+int96RebaseSpec)
+  val reader = if (pushed.isDefined && enableRecordFilter) {
+val parquetFilter = FilterCompat.get(pushed.get, null)
+new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
   } else {
-val joinedRow = new JoinedRow()
-iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+new ParquetRecordReader[InternalRow](readSupport)
   }
-} catch {
-  case e: Throwable =>
-// SPARK-23457: In case there is an exception in initialization, 
close the iterator to
-// avoid leaking resources.
-iter.close()
-throw e
+  val readerWithRowIndexes = 
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+requiredSchema)
+  val iter = new 
RecordReaderIterator[InternalRow](readerWithRowIndexes)
+  try {
+readerWithRowIndexes.initialize(split, hadoopAttemptContext)
+
+val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+val unsafeProjection = 
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+if (partitionSchema.length == 0) {
+  // There is no partition columns
+  iter.map(unsafeProjection)
+} else {
+  val joinedRow = new JoinedRow()
+  iter.map(d => unsafeProjection(joinedRow(d, 
file.partitionValues)))
+}
+  } catch {
+case e: Throwable =>
+  // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+  // avoid leaking resources.
+  iter.close()
+  throw e
+  }
+} finally {
+  parquetReader.close()

Review Comment:
   I haven't change the code logic, just add to close parquetReader.
   Before, if the non-vectorization code path will close the reader after 
reading `footerFileMetaData` , it is consistent here. Please see
   [the original implementation 
code](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java#L53)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


zhengruifeng commented on PR #40252:
URL: https://github.com/apache/spark/pull/40252#issuecomment-1451803222

   I guess you can refer to `JDBCSuite` and `ClientE2ETestSuite` ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jiang13021 opened a new pull request, #40253: [SPARK-42553][SQL] Ensure at least one time unit after "interval"

2023-03-02 Thread via GitHub


jiang13021 opened a new pull request, #40253:
URL: https://github.com/apache/spark/pull/40253

   
   
   ### What changes were proposed in this pull request?
   
   THis PR aims to ensure "at least one time unit should be given for interval 
literal" by modifying SqlBaseParser
   
   ### Why are the changes needed?
   
   INTERVAL is a Non-Reserved keyword in spark. But when I run
   ```shell
   scala> spark.sql("select interval from mytable")
   ```
   I get
   ```
   org.apache.spark.sql.catalyst.parser.ParseException:
   at least one time unit should be given for interval literal(line 1, pos 7)== 
SQL ==
   select interval from mytable
   ---^^^  at 
org.apache.spark.sql.errors.QueryParsingErrors$.invalidIntervalLiteralError(QueryParsingErrors.scala:196)
   ..
   ```
   It is a bug because "Non-Reserved keywords" have a special meaning in 
particular contexts and can be used as identifiers in other contexts. So by 
design, INTERVAL can be used as a column name.
   
   Currently the interval's grammar is
   ```
   interval
   : INTERVAL (errorCapturingMultiUnitsInterval | 
errorCapturingUnitToUnitInterval)?
   ;
   ```
   There is no need to make the time unit nullable, we can ensure "at least one 
time unit should be given for interval literal" if the interval's grammar is
   ```
   interval
   : INTERVAL (errorCapturingMultiUnitsInterval | 
errorCapturingUnitToUnitInterval)
   ;
   ```
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test: PlanParsserSuite."SPARK-42553: NonReserved keyword 'interval' can 
be column name"
   
   Local test
   ```shell
   scala> val myDF = spark.sparkContext.makeRDD(1 to 5).toDF("interval")
   myDF: org.apache.spark.sql.DataFrame = [interval: int]
   
   scala> myDF.createOrReplaceTempView("mytable")
   
   scala> spark.sql("select interval from mytable;").show()
   ++
   |interval|
   ++
   |   1|
   |   2|
   |   3|
   |   4|
   |   5|
   ++
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jiang13021 commented on pull request #40195: [SPARK-42553][SQL] Ensure at least one time unit after "interval"

2023-03-02 Thread via GitHub


jiang13021 commented on PR #40195:
URL: https://github.com/apache/spark/pull/40195#issuecomment-1451819657

   > @jiang13021 The changes causes some conflicts in branch-3.3. Could you 
open a separate PR with a backport to Spark 3.3.
   Thanks for your review. Do you mean launch a new PR to branch-3.3? Here it 
is: https://github.com/apache/spark/pull/40253


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #40195: [SPARK-42553][SQL] Ensure at least one time unit after "interval"

2023-03-02 Thread via GitHub


MaxGekk commented on PR #40195:
URL: https://github.com/apache/spark/pull/40195#issuecomment-1451827970

   > Do you mean launch a new PR to branch-3.3? Here it is: 
https://github.com/apache/spark/pull/40253
   
   Yep. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #40253: [SPARK-42553][SQL] Ensure at least one time unit after "interval"

2023-03-02 Thread via GitHub


MaxGekk commented on PR #40253:
URL: https://github.com/apache/spark/pull/40253#issuecomment-1451830956

   @jiang13021 Thank you for the backport. Could add the following, please:
   1. The tag `[3.3]` to PR's title.
   2. `This is a backport of https://github.com/apache/spark/pull/40195` to 
PR's description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #40220: [WIP][SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types

2023-03-02 Thread via GitHub


srowen commented on PR #40220:
URL: https://github.com/apache/spark/pull/40220#issuecomment-1451854672

   The line changed, and now the 'ignore' is no longer relevant - yes remove it 
to pass the linter


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jiang13021 commented on pull request #40253: [SPARK-42553][SQL][3.3] Ensure at least one time unit after "interval"

2023-03-02 Thread via GitHub


jiang13021 commented on PR #40253:
URL: https://github.com/apache/spark/pull/40253#issuecomment-1451860426

   > @jiang13021 Thank you for the backport. Could add the following, please:
   > 
   > 1. The tag `[3.3]` to PR's title.
   > 2. `This is a backport of https://github.com/apache/spark/pull/40195` to 
PR's description.
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] EnricoMi commented on pull request #38223: [SPARK-40770][PYTHON] Improved error messages for applyInPandas for schema mismatch

2023-03-02 Thread via GitHub


EnricoMi commented on PR #38223:
URL: https://github.com/apache/spark/pull/38223#issuecomment-1451889010

   @HyukjinKwon why has this been reverted? a86324cb
   CI looked pretty green: 
https://github.com/apache/spark/actions/runs/4129838944/jobs/7135888364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yabola commented on pull request #39950: [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader

2023-03-02 Thread via GitHub


yabola commented on PR #39950:
URL: https://github.com/apache/spark/pull/39950#issuecomment-1451913404

   @sunchao Sorry, it might be a mistake. We should read schema in footer meta 
first to get which filters need to be pushed down. After that we set pushdown 
info 
[codes](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L261)
  and read filtered RowGroups.
   
   So I think it might have to read footer twice if having filters. But we can 
read footer once when no filter is needed pushdown.
   
   I think It will be useful when scanning joined tables (filter condition is 
only on the other side of the join table and filter is none in the joined 
table).  It can avoid reading much footer when there are many  joined tables.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen closed pull request #40219: [SPARK-42622][CORE] Disable substitution in values

2023-03-02 Thread via GitHub


srowen closed pull request #40219: [SPARK-42622][CORE] Disable substitution in 
values
URL: https://github.com/apache/spark/pull/40219


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #40219: [SPARK-42622][CORE] Disable substitution in values

2023-03-02 Thread via GitHub


srowen commented on PR #40219:
URL: https://github.com/apache/spark/pull/40219#issuecomment-1451982884

   Merged to master/3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #40253: [SPARK-42553][SQL][3.3] Ensure at least one time unit after "interval"

2023-03-02 Thread via GitHub


MaxGekk commented on PR #40253:
URL: https://github.com/apache/spark/pull/40253#issuecomment-1452051052

   +1, LGTM. All GAs passed. Merging to 3.3.
   Thank you, @jiang13021.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #40253: [SPARK-42553][SQL][3.3] Ensure at least one time unit after "interval"

2023-03-02 Thread via GitHub


MaxGekk closed pull request #40253: [SPARK-42553][SQL][3.3] Ensure at least one 
time unit after "interval"
URL: https://github.com/apache/spark/pull/40253


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


LuciferYang commented on code in PR #40252:
URL: https://github.com/apache/spark/pull/40252#discussion_r1123323759


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala:
##
@@ -184,6 +186,67 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
 }
   }
 
+  /**
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
+   * url named table and connection properties.
+   *
+   * You can find the JDBC-specific option and parameter documentation for 
reading tables
+   * via JDBC in
+   * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
+   *   Data Source Option in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+// properties should override settings in extraOptions.
+this.extraOptions ++= properties.asScala
+// explicit url and dbtable should override all
+this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+format("jdbc").load()
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
+   * url named table. Partitions of the table will be retrieved in parallel 
based on the parameters
+   * passed to this function.
+   *
+   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
+   * your external database systems.
+   *
+   * You can find the JDBC-specific option and parameter documentation for 
reading tables via JDBC in
+   * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
+   *   Data Source Option in the version you use.
+   *
+   * @param table Name of the table in the external database.
+   * @param columnName Alias of `partitionColumn` option. Refer to 
`partitionColumn` in
+   *   https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
+   * Data Source Option in the version you use.
+   * @param connectionProperties JDBC database connection arguments, a list of 
arbitrary string
+   * tag/value. Normally at least a "user" and 
"password" property
+   * should be included. "fetchsize" can be used 
to control the
+   * number of rows per fetch and "queryTimeout" 
can be used to wait
+   * for a Statement object to execute to the 
given number of seconds.
+   * @since 3.5.0

Review Comment:
   Will this not be backport to 3.4.0?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request, #40254: [SPARK-42654][BUILD] WIP

2023-03-02 Thread via GitHub


LuciferYang opened a new pull request, #40254:
URL: https://github.com/apache/spark/pull/40254

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request, #40255: [SPARK-42558][CONNECT] DataFrameStatFunctions WIP

2023-03-02 Thread via GitHub


LuciferYang opened a new pull request, #40255:
URL: https://github.com/apache/spark/pull/40255

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40255: [SPARK-42558][CONNECT] DataFrameStatFunctions WIP

2023-03-02 Thread via GitHub


LuciferYang commented on code in PR #40255:
URL: https://github.com/apache/spark/pull/40255#discussion_r1123361868


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -586,6 +586,57 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   list.asScala.map(kv => Row(kv.key, kv.value)),
   session.createDataFrame(list.asScala.toSeq))
   }
+
+  test("approximate quantile") {

Review Comment:
   will add more case later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40255: [SPARK-42558][CONNECT] DataFrameStatFunctions WIP

2023-03-02 Thread via GitHub


LuciferYang commented on code in PR #40255:
URL: https://github.com/apache/spark/pull/40255#discussion_r1123365858


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl, util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.Relation
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
PrimitiveDoubleEncoder}
+import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch}
+
+/**
+ * Statistic functions for `DataFrame`s.
+ *
+ * @since 3.4.0
+ */
+final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, 
root: Relation) {
+
+  /**
+   * Calculates the approximate quantiles of a numerical column of a DataFrame.
+   *
+   * The result of this algorithm has the following deterministic bound: If 
the DataFrame has N
+   * elements and if we request the quantile at probability `p` up to error 
`err`, then the
+   * algorithm will return a sample `x` from the DataFrame so that the *exact* 
rank of `x` is
+   * close to (p * N). More precisely,
+   *
+   * {{{
+   *   floor((p - err) * N) <= rank(x) <= ceil((p + err) * N)
+   * }}}
+   *
+   * This method implements a variation of the Greenwald-Khanna algorithm 
(with some speed
+   * optimizations). The algorithm was first present in https://doi.org/10.1145/375663.375670";> Space-efficient Online 
Computation of Quantile
+   * Summaries by Greenwald and Khanna.
+   *
+   * @param col
+   *   the name of the numerical column
+   * @param probabilities
+   *   a list of quantile probabilities Each number must belong to [0, 1]. For 
example 0 is the
+   *   minimum, 0.5 is the median, 1 is the maximum.
+   * @param relativeError
+   *   The relative target precision to achieve (greater than or equal to 0). 
If set to zero, the
+   *   exact quantiles are computed, which could be very expensive. Note that 
values greater than
+   *   1 are accepted but give the same result as 1.
+   * @return
+   *   the approximate quantiles at the given probabilities
+   *
+   * @note
+   *   null and NaN values will be removed from the numerical column before 
calculation. If the
+   *   dataframe is empty or the column only contains null or NaN, an empty 
array is returned.
+   *
+   * @since 3.4.0
+   */
+  def approxQuantile(
+  col: String,
+  probabilities: Array[Double],
+  relativeError: Double): Array[Double] = {
+approxQuantile(Array(col), probabilities, relativeError).head
+  }
+
+  /**
+   * Calculates the approximate quantiles of numerical columns of a DataFrame.
+   * @see
+   *   `approxQuantile(col:Str* approxQuantile)` for detailed description.
+   *
+   * @param cols
+   *   the names of the numerical columns
+   * @param probabilities
+   *   a list of quantile probabilities Each number must belong to [0, 1]. For 
example 0 is the
+   *   minimum, 0.5 is the median, 1 is the maximum.
+   * @param relativeError
+   *   The relative target precision to achieve (greater than or equal to 0). 
If set to zero, the
+   *   exact quantiles are computed, which could be very expensive. Note that 
values greater than
+   *   1 are accepted but give the same result as 1.
+   * @return
+   *   the approximate quantiles at the given probabilities of each column
+   *
+   * @note
+   *   null and NaN values will be ignored in numerical columns before 
calculation. For columns
+   *   only containing null or NaN values, an empty array is returned.
+   *
+   * @since 3.4.0
+   */
+  def approxQuantile(
+  cols: Array[String],
+  probabilities: Array[Double],
+  relativeError: Double): Array[Array[Double]] = {
+require(
+  probabilities.forall(p => p >= 0.0 && p <= 1.0),
+  "percentile should be in the range [0.0, 1.0]")
+require(relativeError >= 0,
+  s"Relative Error must be non-negative but got $relativeError")
+sparkSession
+  .newDataset(
+ArrayEncoder(
+  ArrayEncoder(PrimitiveDoubleEncoder, containsNull = false),
+  containsNull = false)) { builder =>
+va

[GitHub] [spark] aimtsou commented on pull request #40220: [WIP][SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types

2023-03-02 Thread via GitHub


aimtsou commented on PR #40220:
URL: https://github.com/apache/spark/pull/40220#issuecomment-1452145653

   Tests are completed, shall I squash and remove wip tag from the pull request?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #40126: [SPARK-40822][SQL] Stable derived column aliases

2023-03-02 Thread via GitHub


MaxGekk commented on PR #40126:
URL: https://github.com/apache/spark/pull/40126#issuecomment-1452182391

   @srielau Could you take a look at the PR one more time, please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a diff in pull request #40126: [SPARK-40822][SQL] Stable derived column aliases

2023-03-02 Thread via GitHub


MaxGekk commented on code in PR #40126:
URL: https://github.com/apache/spark/pull/40126#discussion_r1123410550


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveAliasesSuite.scala:
##
@@ -88,4 +94,46 @@ class ResolveAliasesSuite extends AnalysisTest {
 checkAliasName(t1.select(DateSub(Literal(Date.valueOf("2021-01-18")), 
Literal(null))),
   "date_sub(DATE '2021-01-18', NULL)")
   }
+
+  test("SPARK-40822: Stable derived column aliases") {
+withSQLConf(SQLConf.STABLE_DERIVED_COLUMN_ALIAS_ENABLED.key -> "true") {
+  Seq(
+// Literals
+"' 1'" -> "' 1'",
+abc -> abc,

Review Comment:
   I have added such conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vicennial opened a new pull request, #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial opened a new pull request, #40256:
URL: https://github.com/apache/spark/pull/40256

   
   
   ### What changes were proposed in this pull request?
   
   This PR introduces a mechanism to transfer artifacts (currently, local 
`.jar` + `.class` files) from a Spark Connect JVM/Scala client over to the 
server side of Spark Connect. The mechanism follows the protocol as defined in 
https://github.com/apache/spark/pull/40147 and supports batching (for multiple 
"small" artifacts) and chunking (for large artifacts).
   
   
   
   ### Why are the changes needed?
   
   In the decoupled client-server architecture of Spark Connect, a remote 
client may use a local JAR or a new class in their UDF that may not be present 
on the server. To handle these cases of missing "artifacts", we implement a 
mechanism to transfer artifacts from the client side over to the server side as 
per the protocol defined in https://github.com/apache/spark/pull/40147.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, users would be able to use the `addArtifact` and `addArtifacts` methods 
(via a `SparkSession` instance) to transfer local files (`.jar` and `.class` 
extensions).
   
   ### How was this patch tested?
   
   Unit tests - located in `ArtifactSuite`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #40220: [WIP][SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types

2023-03-02 Thread via GitHub


srowen commented on PR #40220:
URL: https://github.com/apache/spark/pull/40220#issuecomment-1452232660

   Yes remove WIP just for completeness. No need to squash, the script does that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] steveloughran commented on pull request #39124: [DON'T MERGE] Test build and test with hadoop 3.3.5-RC2

2023-03-02 Thread via GitHub


steveloughran commented on PR #39124:
URL: https://github.com/apache/spark/pull/39124#issuecomment-1452267600

   thanks. HDFS team reporting a probable RC blocker, but now is the time to 
find any other issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123536612


##
python/pyspark/sql/connect/plan.py:
##
@@ -338,7 +338,7 @@ def plan(self, session: "SparkConnectClient") -> 
proto.Relation:
 plan.local_relation.data = sink.getvalue().to_pybytes()
 
 if self._schema is not None:
-plan.local_relation.schema = self._schema
+plan.local_relation.schemaString = self._schema

Review Comment:
   ```suggestion
   plan.local_relation.schema_string = self._schema
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40252:
URL: https://github.com/apache/spark/pull/40252#discussion_r1123553052


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala:
##
@@ -184,6 +186,67 @@ class DataFrameReader private[sql] (sparkSession: 
SparkSession) extends Logging
 }
   }
 
+  /**
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
+   * url named table and connection properties.
+   *
+   * You can find the JDBC-specific option and parameter documentation for 
reading tables
+   * via JDBC in
+   * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
+   *   Data Source Option in the version you use.
+   *
+   * @since 3.5.0
+   */
+  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+// properties should override settings in extraOptions.
+this.extraOptions ++= properties.asScala
+// explicit url and dbtable should override all
+this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+format("jdbc").load()
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * Construct a `DataFrame` representing the database table accessible via 
JDBC URL
+   * url named table. Partitions of the table will be retrieved in parallel 
based on the parameters
+   * passed to this function.
+   *
+   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
+   * your external database systems.
+   *
+   * You can find the JDBC-specific option and parameter documentation for 
reading tables via JDBC in
+   * https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
+   *   Data Source Option in the version you use.
+   *
+   * @param table Name of the table in the external database.
+   * @param columnName Alias of `partitionColumn` option. Refer to 
`partitionColumn` in
+   *   https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
+   * Data Source Option in the version you use.
+   * @param connectionProperties JDBC database connection arguments, a list of 
arbitrary string
+   * tag/value. Normally at least a "user" and 
"password" property
+   * should be included. "fetchsize" can be used 
to control the
+   * number of rows per fetch and "queryTimeout" 
can be used to wait
+   * for a Statement object to execute to the 
given number of seconds.
+   * @since 3.5.0

Review Comment:
   I am fine with marking this as 3.4.0. There is no harm in adding this to 
3.4.0, if the RC does not pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


hvanhovell commented on PR #40252:
URL: https://github.com/apache/spark/pull/40252#issuecomment-1452353820

   @beliefer you can create a test in `PlanGenerationTestSuite`. That will at 
least validate the proto message we are generating, and it will validate that 
plan you are producing yields a valid plan in `ProtoToPlanTestSuite`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite`

2023-03-02 Thread via GitHub


hvanhovell commented on PR #40213:
URL: https://github.com/apache/spark/pull/40213#issuecomment-1452355034

   Alirght merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell closed pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite`

2023-03-02 Thread via GitHub


hvanhovell closed pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce 
`dev/connect-jvm-client-mima-check`  instead of `CompatibilitySuite`
URL: https://github.com/apache/spark/pull/40213


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite`

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40213:
URL: https://github.com/apache/spark/pull/40213#discussion_r1123563539


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala:
##
@@ -184,30 +218,36 @@ class CompatibilitySuite extends ConnectFunSuite {
   .filter { p =>
 excludeRules.forall(rule => rule(p))
   }
-
-if (problems.nonEmpty) {
-  fail(
-s"\nComparing client jar: $clientJar\nand sql jar: $sqlJar\n" +
-  problems.map(p => p.description("client")).mkString("\n"))
-}
+problems
   }
 
-  test("compatibility API tests: Dataset") {
-val clientClassLoader: URLClassLoader = new 
URLClassLoader(Seq(clientJar.toURI.toURL).toArray)
-val sqlClassLoader: URLClassLoader = new 
URLClassLoader(Seq(sqlJar.toURI.toURL).toArray)
+  private def checkDatasetApiCompatibility(clientJar: File, sqlJar: File): 
Seq[String] = {

Review Comment:
   This test was initially added because we did not cover the Dataset in the 
mima check. 
   
   If we found such test is useful and want to expand to more classes we can 
run the mima check the other way around to ensure that our new code does not 
introduce more unwanted methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123572526


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -287,6 +288,28 @@ class SparkSession private[sql] (
 client.execute(plan).asScala.foreach(_ => ())
   }
 
+  /**
+   * Add a single artifact to the client session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = client.addArtifact(path)

Review Comment:
   Can you mark these as experimental?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


amaliujia commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123575136


##
python/pyspark/sql/connect/plan.py:
##
@@ -338,7 +338,7 @@ def plan(self, session: "SparkConnectClient") -> 
proto.Relation:
 plan.local_relation.data = sink.getvalue().to_pybytes()
 
 if self._schema is not None:
-plan.local_relation.schema = self._schema
+plan.local_relation.schemaString = self._schema

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] peter-toth commented on a diff in pull request #40093: [SPARK-42500][SQL] ConstantPropagation supports more cases

2023-03-02 Thread via GitHub


peter-toth commented on code in PR #40093:
URL: https://github.com/apache/spark/pull/40093#discussion_r1123582002


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##
@@ -200,14 +200,20 @@ object ConstantPropagation extends Rule[LogicalPlan] {
 
   private def replaceConstants(condition: Expression, equalityPredicates: 
EqualityPredicates)
 : Expression = {
-val constantsMap = AttributeMap(equalityPredicates.map(_._1))
-val predicates = equalityPredicates.map(_._2).toSet
-def replaceConstants0(expression: Expression) = expression transform {
+val allConstantsMap = AttributeMap(equalityPredicates.map(_._1))
+val allPredicates = equalityPredicates.map(_._2).toSet
+def replaceConstants0(
+expression: Expression, constantsMap: AttributeMap[Literal]) = 
expression transform {
   case a: AttributeReference => constantsMap.getOrElse(a, a)
 }
 condition transform {
-  case e @ EqualTo(_, _) if !predicates.contains(e) => replaceConstants0(e)
-  case e @ EqualNullSafe(_, _) if !predicates.contains(e) => 
replaceConstants0(e)
+  case b: BinaryComparison =>

Review Comment:
   Let me open a PR tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123588842


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompl

[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123590310


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompl

[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123590597


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompl

[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123591424


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompl

[GitHub] [spark] zhenlineo opened a new pull request, #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo opened a new pull request, #40257:
URL: https://github.com/apache/spark/pull/40257

   ### What changes were proposed in this pull request?
   Adding a simple script to start the Scala client in the Scala REPL. As well 
as a script to start the spark connect server for the client to connect to.
   
   ### Why are the changes needed?
   Make the JVM client more easy to be used by users.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Manually tested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123642955


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala:
##
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsRequest
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+
+class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
+
+  private var client: SparkConnectClient = _
+  private var service: DummySparkConnectService = _
+  private var server: Server = _
+  private var artifactManager: ArtifactManager = _
+  private var channel: ManagedChannel = _
+
+  private def startDummyServer(): Unit = {
+service = new DummySparkConnectService()
+server = InProcessServerBuilder
+  .forName(getClass.getName)
+  .addService(service)
+  .build()
+server.start()
+  }
+
+  private def createArtifactManager(): Unit = {
+channel = 
InProcessChannelBuilder.forName(getClass.getName).directExecutor().build()
+artifactManager = new 
ArtifactManager(proto.UserContext.newBuilder().build(), channel)
+  }
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+startDummyServer()
+createArtifactManager()
+client = null
+  }
+
+  override def afterEach(): Unit = {
+if (server != null) {
+  server.shutdownNow()
+  assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to 
shutdown")
+}
+
+if (channel != null) {
+  channel.shutdownNow()
+}
+
+if (client != null) {
+  client.shutdown()
+}
+  }
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+  protected def artifactFilePath: Path = 
baseResourcePath.resolve("artifact-tests")
+
+  /**
+   * Check if the data sent to the server (stored in `artifactChunk`) is 
equivalent to the local
+   * data at `localPath`.
+   * @param artifactChunk
+   * @param localPath
+   */
+  private def assertFileDataEquality(
+  artifactChunk: AddArtifactsRequest.ArtifactChunk,
+  localPath: Path): Unit = {
+val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32)
+val localData = ByteString.readFrom(in)
+assert(artifactChunk.getData == localData)
+assert(artifactChunk.getCrc == in.getChecksum.getValue)
+  }
+
+  private def singleChunkArtifactTest(path: String): Unit = {
+test(s"Single Chunk Artifact - $path") {
+  val artifactPath = artifactFilePath.resolve(path)
+  artifactManager.addArtifact(artifactPath.toString)
+
+  val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+  // Single `AddArtifactRequest`
+  assert(receivedRequests.size == 1)
+
+  val request = receivedRequests.head
+  assert(request.hasBatch)
+
+  val batch = request.getBatch
+  // Single artifact in batch
+  assert(batch.getArtifactsList.size() == 1)
+
+  val singleChunkArtifact = batch.getArtifacts(0)
+  val namePrefix = artifactPath.getFileName.toString match {
+case jar if jar.endsWith(".jar") => "jars"
+case cf if cf.endsWith(".class") => "classes"
+  }
+  assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path))
+  assertFileDataEquality(singleChunkArtifact.getData, artifactPath)
+}
+  }
+
+  singleChunkArtifactTest("smallClassFile.class")
+
+  singleChunkArtifactTest("smallJar.jar")
+
+  private def readNextChunk(in: InputStream): ByteString = {
+val buf = new Array[Byte](CHUNK_SIZE)
+var bytesRead = 0
+var count = 0
+while (count != -1 && bytesRead < CHUNK_SIZE) {
+  count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+  if (count != -1) {
+bytesRead += count
+  }
+}
+ 

[GitHub] [spark] shrprasa opened a new pull request, #40258: [WIP][SPARK-42655]:Incorrect ambiguous column reference error

2023-03-02 Thread via GitHub


shrprasa opened a new pull request, #40258:
URL: https://github.com/apache/spark/pull/40258

   Incorrect ambiguous column reference error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123653086


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""
+val sessionBuilder = SparkSession.builder()
+val spark = if (conStr.isEmpty) sessionBuilder.build() else 
sessionBuilder.remote(conStr).build()

Review Comment:
   Do we default to a local connection if none is set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123653670


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark submit
+#build/sbt package
+
+# Build the jars needed for spark connect
+build/sbt "connect/assembly"
+
+CONNECT_JAR=`ls 
"${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar
 | paste -sd ',' -`
+
+exec "${SPARK_HOME}"/bin/spark-submit --class 
org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR"

Review Comment:
   We have two scripts for starting and stopping a connect server. Should we 
use those instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123654587


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash

Review Comment:
   Since these scripts are aimed at development, maybe put them in 
connect/connect/bin?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123653670


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark submit
+#build/sbt package
+
+# Build the jars needed for spark connect
+build/sbt "connect/assembly"
+
+CONNECT_JAR=`ls 
"${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar
 | paste -sd ',' -`
+
+exec "${SPARK_HOME}"/bin/spark-submit --class 
org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR"

Review Comment:
   We have two scripts for starting and stopping a connect server. Should we 
use those instead? If this is for dev, then we should keep these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123660107


##
bin/spark-connect-scala-client:
##
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"
+
+CONNECT_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
connect-client-jvm/fullClasspath" | grep jar | tail -n1)"
+SQL_CLASSPATH="$(build/sbt -DcopyDependencies=false "export sql/fullClasspath" 
| grep jar | tail -n1)"
+
+INIT_SCRIPT="${SPARK_HOME}"/bin/spark-connect-scala-client.sc
+"${SPARK_HOME}"/build/scala*/bin/scala -cp "$CONNECT_CLASSPATH:$SQL_CLASSPATH" 
-i $INIT_SCRIPT

Review Comment:
   TIL you can do this :)...
   
   I have multiple scala versions floating around in my spark dir. We should 
pick the current one...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123661416


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark submit
+#build/sbt package

Review Comment:
   Remove comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite

2023-03-02 Thread via GitHub


hvanhovell commented on PR #40241:
URL: https://github.com/apache/spark/pull/40241#issuecomment-1452528841

   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell closed pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite

2023-03-02 Thread via GitHub


hvanhovell closed pull request #40241: [SPARK-42640][CONNECT] Remove stale 
entries from the excluding rules for CompatibilitySuite
URL: https://github.com/apache/spark/pull/40241


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123688156


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""

Review Comment:
   Feature request, can we try to configure this using a `--connectionString` 
argument?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123695202


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""
+val sessionBuilder = SparkSession.builder()
+val spark = if (conStr.isEmpty) sessionBuilder.build() else 
sessionBuilder.remote(conStr).build()

Review Comment:
   Oh BTW. there does not seem to be SparkSession.Builder.remote() function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123696037


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""
+val sessionBuilder = SparkSession.builder()
+val spark = if (conStr.isEmpty) sessionBuilder.build() else 
sessionBuilder.remote(conStr).build()

Review Comment:
   Can you also set the user agent to something meaningful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123697756


##
bin/spark-connect-scala-client:
##
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash

Review Comment:
   Script files need to have a license as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123698953


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -353,11 +353,16 @@ message LocalRelation {
   optional bytes data = 1;
 
   // (Optional) The schema of local data.
-  // It should be either a DDL-formatted type string or a JSON string.
   //
   // The server side will update the column names and data types according to 
this schema.
   // If the 'data' is not provided, then this schema will be required.
-  optional string schema = 2;
+  oneof schema {
+// Either a DDL-formatted type string or a JSON string.
+string schemaString = 2;

Review Comment:
   thanks; it is been a while since I touched proto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia opened a new pull request, #40259: [SPARK-42609][CONNECT] Add tests for grouping() and grouping_id() functions

2023-03-02 Thread via GitHub


amaliujia opened a new pull request, #40259:
URL: https://github.com/apache/spark/pull/40259

   
   
   ### What changes were proposed in this pull request?
   
   Add tests for grouping() and grouping_id() functions.
   
   ### Why are the changes needed?
   
   Improve testing coverage.
   
   ### Does this PR introduce _any_ user-facing change?
   
   NO
   
   ### How was this patch tested?
   
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #40259: [SPARK-42609][CONNECT] Add tests for grouping() and grouping_id() functions

2023-03-02 Thread via GitHub


amaliujia commented on PR #40259:
URL: https://github.com/apache/spark/pull/40259#issuecomment-1452625458

   @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123818828


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark submit
+#build/sbt package
+
+# Build the jars needed for spark connect
+build/sbt "connect/assembly"
+
+CONNECT_JAR=`ls 
"${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar
 | paste -sd ',' -`
+
+exec "${SPARK_HOME}"/bin/spark-submit --class 
org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR"

Review Comment:
   Where is the other script?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123821210


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onComple

[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123821569


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onComple

[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123829338


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onComple

[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123833306


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onComple

[GitHub] [spark] ueshin opened a new pull request, #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


ueshin opened a new pull request, #40260:
URL: https://github.com/apache/spark/pull/40260

   ### What changes were proposed in this pull request?
   
   Delays parsing DDL string for Python UDFs until `SparkConnectClient` is 
available.
   
   Also changes `createDataFrame` to use the proto `DDLParse`.
   
   ### Why are the changes needed?
   
   Currently `parse_data_type` depends on `PySparkSession` that creates a local 
PySpark, but it won't be available in the client side.
   
   When `SparkConnectClient` is available, we can use the new proto `DDLParse` 
to parse the data types as string.
   
   ### Does this PR introduce _any_ user-facing change?
   
   The UDF's `returnType` attribute could be a string in Spark Connect if it is 
provided as string.
   
   ### How was this patch tested?
   
   Existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40210: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40210:
URL: https://github.com/apache/spark/pull/40210#discussion_r1123891426


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
InvalidPlanInput}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, 
FormattedMode, SimpleMode}
+
+private[connect] class SparkConnectAnalyzeHandler(
+responseObserver: StreamObserver[proto.AnalyzePlanResponse])
+extends Logging {
+
+  def handle(request: proto.AnalyzePlanRequest): Unit = {
+val session =
+  SparkConnectService
+.getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getClientId)
+.session
+
+val response = process(request, session)
+responseObserver.onNext(response)
+responseObserver.onCompleted()

Review Comment:
   I guess we should surround this with `session.withActive`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm closed pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


mridulm closed pull request #39459: [SPARK-41497][CORE] Fixing accumulator 
undercount in the case of the retry task with rdd cache
URL: https://github.com/apache/spark/pull/39459


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


mridulm commented on PR #39459:
URL: https://github.com/apache/spark/pull/39459#issuecomment-1452754371

   Merged to master.
   Thanks for fixing this @ivoson !
   Thanks for the reviews @Ngone51 :-)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123897616


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala:
##
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsRequest
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+
+class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
+
+  private var client: SparkConnectClient = _
+  private var service: DummySparkConnectService = _
+  private var server: Server = _
+  private var artifactManager: ArtifactManager = _
+  private var channel: ManagedChannel = _
+
+  private def startDummyServer(): Unit = {
+service = new DummySparkConnectService()
+server = InProcessServerBuilder
+  .forName(getClass.getName)
+  .addService(service)
+  .build()
+server.start()
+  }
+
+  private def createArtifactManager(): Unit = {
+channel = 
InProcessChannelBuilder.forName(getClass.getName).directExecutor().build()
+artifactManager = new 
ArtifactManager(proto.UserContext.newBuilder().build(), channel)
+  }
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+startDummyServer()
+createArtifactManager()
+client = null
+  }
+
+  override def afterEach(): Unit = {
+if (server != null) {
+  server.shutdownNow()
+  assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to 
shutdown")
+}
+
+if (channel != null) {
+  channel.shutdownNow()
+}
+
+if (client != null) {
+  client.shutdown()
+}
+  }
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+  protected def artifactFilePath: Path = 
baseResourcePath.resolve("artifact-tests")
+
+  /**
+   * Check if the data sent to the server (stored in `artifactChunk`) is 
equivalent to the local
+   * data at `localPath`.
+   * @param artifactChunk
+   * @param localPath
+   */
+  private def assertFileDataEquality(
+  artifactChunk: AddArtifactsRequest.ArtifactChunk,
+  localPath: Path): Unit = {
+val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32)
+val localData = ByteString.readFrom(in)
+assert(artifactChunk.getData == localData)
+assert(artifactChunk.getCrc == in.getChecksum.getValue)
+  }
+
+  private def singleChunkArtifactTest(path: String): Unit = {
+test(s"Single Chunk Artifact - $path") {
+  val artifactPath = artifactFilePath.resolve(path)
+  artifactManager.addArtifact(artifactPath.toString)
+
+  val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+  // Single `AddArtifactRequest`
+  assert(receivedRequests.size == 1)
+
+  val request = receivedRequests.head
+  assert(request.hasBatch)
+
+  val batch = request.getBatch
+  // Single artifact in batch
+  assert(batch.getArtifactsList.size() == 1)
+
+  val singleChunkArtifact = batch.getArtifacts(0)
+  val namePrefix = artifactPath.getFileName.toString match {
+case jar if jar.endsWith(".jar") => "jars"
+case cf if cf.endsWith(".class") => "classes"
+  }
+  assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path))
+  assertFileDataEquality(singleChunkArtifact.getData, artifactPath)
+}
+  }
+
+  singleChunkArtifactTest("smallClassFile.class")
+
+  singleChunkArtifactTest("smallJar.jar")
+
+  private def readNextChunk(in: InputStream): ByteString = {
+val buf = new Array[Byte](CHUNK_SIZE)
+var bytesRead = 0
+var count = 0
+while (count != -1 && bytesRead < CHUNK_SIZE) {
+  count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+  if (count != -1) {
+bytesRead += count
+  }
+}
+  

[GitHub] [spark] ueshin opened a new pull request, #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive

2023-03-02 Thread via GitHub


ueshin opened a new pull request, #40261:
URL: https://github.com/apache/spark/pull/40261

   ### What changes were proposed in this pull request?
   
   Fix `SparkConnectAnalyzeHandler` to use `withActive`.
   
   ### Why are the changes needed?
   
   Similar to #40165, `SQLConf.get` is necessary when transforming the proto to 
plans.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40210: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40210:
URL: https://github.com/apache/spark/pull/40210#discussion_r1123898851


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
InvalidPlanInput}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, 
FormattedMode, SimpleMode}
+
+private[connect] class SparkConnectAnalyzeHandler(
+responseObserver: StreamObserver[proto.AnalyzePlanResponse])
+extends Logging {
+
+  def handle(request: proto.AnalyzePlanRequest): Unit = {
+val session =
+  SparkConnectService
+.getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getClientId)
+.session
+
+val response = process(request, session)
+responseObserver.onNext(response)
+responseObserver.onCompleted()

Review Comment:
   #40261



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


beliefer commented on PR #40252:
URL: https://github.com/apache/spark/pull/40252#issuecomment-1452763000

   > I guess you can refer to `JDBCSuite` and `ClientE2ETestSuite` ?
   
   The built-in H2 running in server side and we can't start H2 database at 
connect API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123910664


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""
+val sessionBuilder = SparkSession.builder()
+val spark = if (conStr.isEmpty) sessionBuilder.build() else 
sessionBuilder.remote(conStr).build()

Review Comment:
   The user agent if not set, defaults to "_SPARK_CONNECT_SCALA". Do you want 
another name for the script client?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123911014


##
bin/spark-connect-scala-client:
##
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"
+
+CONNECT_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
connect-client-jvm/fullClasspath" | grep jar | tail -n1)"
+SQL_CLASSPATH="$(build/sbt -DcopyDependencies=false "export sql/fullClasspath" 
| grep jar | tail -n1)"
+
+INIT_SCRIPT="${SPARK_HOME}"/bin/spark-connect-scala-client.sc
+"${SPARK_HOME}"/build/scala*/bin/scala -cp "$CONNECT_CLASSPATH:$SQL_CLASSPATH" 
-i $INIT_SCRIPT

Review Comment:
   Now it will find the version set in the top pom file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123913129


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""

Review Comment:
   This feature request is a bit hard :D I cannot figure out a way to pass the 
args to the program rather than scala repl. 
   
   I think it relates '-i'. We need '-i' to keep the repl running, then the 
repl thinks all args are to the repl rather than the scala script.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


ivoson commented on PR #39459:
URL: https://github.com/apache/spark/pull/39459#issuecomment-1452774796

   Thanks for the review and help on this. @mridulm @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123920584


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -353,11 +353,16 @@ message LocalRelation {
   optional bytes data = 1;
 
   // (Optional) The schema of local data.
-  // It should be either a DDL-formatted type string or a JSON string.
   //
   // The server side will update the column names and data types according to 
this schema.
   // If the 'data' is not provided, then this schema will be required.
-  optional string schema = 2;
+  oneof schema {

Review Comment:
   In some cases, we need to parse it beforehand anyway, for the case of 
#40240, we can't create converter from the local python object to Arrow table 
without the schema as `DataType` object.
   
   Another example is UDFs. It needs to pickle the function, and its return 
type as `DataType` object. We can't generate Python `DataType` object and 
pickle it in the `command` field in server side anymore. So we need to parse 
the DDL string beforehand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123923286


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -353,11 +353,16 @@ message LocalRelation {
   optional bytes data = 1;
 
   // (Optional) The schema of local data.
-  // It should be either a DDL-formatted type string or a JSON string.
   //
   // The server side will update the column names and data types according to 
this schema.
   // If the 'data' is not provided, then this schema will be required.
-  optional string schema = 2;
+  oneof schema {

Review Comment:
   I think this change is fine. I'd rather like to change all places to accept 
both string and `DataType` and the choice is up to the client developers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen closed pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types

2023-03-02 Thread via GitHub


srowen closed pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy 
deprecated and removed types
URL: https://github.com/apache/spark/pull/40220


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >