[GitHub] [spark] HyukjinKwon commented on issue #26053: [SPARK-29379][SQL]SHOW FUNCTIONS show '!=', '<>' , 'between', 'case'
HyukjinKwon commented on issue #26053: [SPARK-29379][SQL]SHOW FUNCTIONS show '!=', '<>' , 'between', 'case' URL: https://github.com/apache/spark/pull/26053#issuecomment-540942383 Alright, let's deal with it later separately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on issue #26080: [SPARK-29425][SQL] The ownership of a database should be respected
yaooqinn commented on issue #26080: [SPARK-29425][SQL] The ownership of a database should be respected URL: https://github.com/apache/spark/pull/26080#issuecomment-540945353 cc @cloud-fan @gatorsmile @wangyum This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26055: [SPARK-29368][SQL][TEST] Port interval.sql
MaxGekk commented on a change in pull request #26055: [SPARK-29368][SQL][TEST] Port interval.sql URL: https://github.com/apache/spark/pull/26055#discussion_r333855323 ## File path: sql/core/src/test/resources/sql-tests/inputs/postgreSQL/interval.sql ## @@ -0,0 +1,330 @@ +-- +-- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group +-- +-- +-- INTERVAL +-- https://github.com/postgres/postgres/blob/REL_12_STABLE/src/test/regress/sql/interval.sql + +-- SET DATESTYLE = 'ISO'; +-- SET IntervalStyle to postgres; + +-- check acceptance of "time zone style" +-- [SPARK-29369] Accept strings without `interval` prefix in casting to intervals +-- [SPARK-29370] Interval strings without explicit unit markings +-- SELECT INTERVAL '01:00' AS `One hour`; +-- SELECT INTERVAL '+02:00' AS `Two hours`; +-- SELECT INTERVAL '-08:00' AS `Eight hours`; +-- SELECT INTERVAL '-1 +02:03' AS `22 hours ago...`; +-- SELECT INTERVAL '-1 days +02:03' AS `22 hours ago...`; +-- [SPARK-29371] Support interval field values with fractional parts +-- SELECT INTERVAL '1.5 weeks' AS `Ten days twelve hours`; +-- SELECT INTERVAL '1.5 months' AS `One month 15 days`; +-- SELECT INTERVAL '10 years -11 month -12 days +13:14' AS `9 years...`; + +-- [SPARK-29382] Support the `INTERVAL` type by Parquet datasource Review comment: This is too much generic, from my point of view. If we use parquet everywhere in the ported tests, `interval.sql` shouldn't be exclusion. And here we have concrete problem is the **parquet** datasource doesn't support writing values of the interval type. > -- [SPARK-29382] Support writing `INTERVAL` type to datasource table Does it mean that `INTERVAL` should be supported by all builtin datasources? I will change the title of the JIRA ticket to unblock this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs URL: https://github.com/apache/spark/pull/26040#discussion_r333857916 ## File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ## @@ -65,12 +65,30 @@ class NettyBlockRpcServer( case fetchShuffleBlocks: FetchShuffleBlocks => val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case (mapId, index) => - fetchShuffleBlocks.reduceIds.apply(index).map { reduceId => -blockManager.getBlockData( - ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId)) + if (!fetchShuffleBlocks.batchFetchEnabled) { +fetchShuffleBlocks.reduceIds(index).map { reduceId => + blockManager.getBlockData( +ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId)) +} + } else { +val startAndEndId = fetchShuffleBlocks.reduceIds(index) +if (startAndEndId.length != 2) { + throw new IllegalStateException(s"Invalid shuffle fetch request: " + Review comment: To be more specific, how about: `"Invalid shuffle fetch request when batch mode is enabled: " ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs URL: https://github.com/apache/spark/pull/26040#discussion_r333843027 ## File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala ## @@ -56,6 +56,18 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends Bl override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId } +// The batch id of continuous shuffle blocks of same mapId in range [startReduceId, endReduceId). +@DeveloperApi +case class ShuffleBlockBatchId( +shuffleId: Int, +mapId: Long, +startReduceId: Int, +endReduceId: Int) extends BlockId { + override def name: String = { +"shuffle_" + shuffleId + "_" + mapId + "_" + startReduceId + "_" + endReduceId Review comment: Shall we use `"_" + startReduceId + "-" + endReduceId` instead? Since `startReduceId` and `endReduceId` belongs to the same semantic group. This also gives more space to extend ShuffleBlockBatchId in the feature. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs URL: https://github.com/apache/spark/pull/26040#discussion_r333848114 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -335,6 +341,51 @@ final class ShuffleBlockFetcherIterator( remoteRequests } + private[this] def mergeContinuousShuffleBlockIdsIfNeeded( + blocks: ArrayBuffer[FetchBlockInfo]): ArrayBuffer[FetchBlockInfo] = { + +def mergeFetchBlockInfo(toBeMerged: ArrayBuffer[FetchBlockInfo]): FetchBlockInfo = { + val startBlockId = toBeMerged.head.blockId.asInstanceOf[ShuffleBlockId] + FetchBlockInfo( +ShuffleBlockBatchId( + startBlockId.shuffleId, + startBlockId.mapId, + startBlockId.reduceId, + toBeMerged.last.blockId.asInstanceOf[ShuffleBlockId].reduceId + 1), +toBeMerged.map(_.size).sum, +toBeMerged.head.mapIndex) +} + +val result = if (fetchContinuousShuffleBlocksInBatch) { + var curBlocks = new ArrayBuffer[FetchBlockInfo] + val mergedBlockInfo = new ArrayBuffer[FetchBlockInfo] + val iter = blocks.iterator + + while (iter.hasNext) { +val info = iter.next() +val curBlockId = info.blockId.asInstanceOf[ShuffleBlockId] +if (curBlocks.isEmpty) { + curBlocks += info +} else { + if (curBlockId.mapId != curBlocks.head.blockId.asInstanceOf[ShuffleBlockId].mapId) { Review comment: How about we keep tracking with `preMapId`, `startReduceId`, `endReduceId` and `mergedBlockSize` and avoid using `curBlocks: ArrayBuffer[FetchBlockInfo]` since we don't need all the info in curBlocks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs URL: https://github.com/apache/spark/pull/26040#discussion_r333855219 ## File path: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ## @@ -41,6 +43,20 @@ private[spark] class BlockStoreShuffleReader[K, C]( private val dep = handle.dependency + private def fetchContinuousBlocksInBatch: Boolean = { +val conf = SparkEnv.get.conf +val compressed = conf.get(config.SHUFFLE_COMPRESS) +val featureEnabled = conf.get(config.SHUFFLE_FETCH_CONTINUOUS_BLOCKS_IN_BATCH) +val serializerRelocatable = dep.serializer.supportsRelocationOfSerializedObjects +// The batch fetching feature only works for reading consolidate file written by +// SortShuffleWriter or UnsafeShuffleWriter. +val readConsolidateFile = !handle.isInstanceOf[BypassMergeSortShuffleHandle[_, _]] + +readConsolidateFile && featureEnabled && endPartition - startPartition > 1 && Review comment: How about we add a log here when featureEnabled is true while the final decision is false to indicate users that fetchContinuous is not actually used due to incompatible configures of serializer or compression codec. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs URL: https://github.com/apache/spark/pull/26040#discussion_r333857410 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java ## @@ -35,20 +35,28 @@ // The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds, // it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id. public final long[] mapIds; + // When batchFetchEnabled=true, reduceIds[i] contains 2 elements: startReduceId (inclusive) and + // endReduceId (exclusive) for the mapper mapIds[i]. + // When batchFetchEnabled=false, reduceIds[i] contains all the reduce IDs that mapper mapIds[i] + // needs to fetch. public final int[][] reduceIds; + public final boolean batchFetchEnabled; Review comment: Since we are adding this field, I believe the `toString` method should also be updated correspondingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs URL: https://github.com/apache/spark/pull/26040#discussion_r333860050 ## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java ## @@ -53,9 +53,9 @@ public int getSize() { /** * Get index offset for a particular reducer. */ - public ShuffleIndexRecord getIndex(int reduceId) { -long offset = offsets.get(reduceId); -long nextOffset = offsets.get(reduceId + 1); + public ShuffleIndexRecord getIndex(int startReduceId, int endReduceId) { Review comment: This is a public method, do we need to add a new method instead to achieve binary compatibility? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei opened a new pull request #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei opened a new pull request #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086 ### What changes were proposed in this pull request? Now, for a dynamic partition overwrite operation, the filename of a task output is determinable. So, if speculation is enabled, a task would conflict with its relative speculation task. In this PR, I make the file name of a task for dynamic partition overwrite be unique. ### Why are the changes needed? Data may be corrupted without this PR. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itsvikramagr commented on a change in pull request #24922: [SPARK-28120][SS] Rocksdb state storage implementation
itsvikramagr commented on a change in pull request #24922: [SPARK-28120][SS] Rocksdb state storage implementation URL: https://github.com/apache/spark/pull/24922#discussion_r333870578 ## File path: sql/core/pom.xml ## @@ -147,6 +147,12 @@ mockito-core test + + + org.rocksdb + rocksdbjni Review comment: @gatorsmile - what are the alternatives if rocksdb is not the best backend. Other streaming technologies such as flink and kstreams are using rocksdb as primary storage engine. With integration in spark codebase, we can probably change the code in any way later, but if we take the separate jar route, the kind of extensions you can make are limited by the current contract. For example @skonto mentioned one of way where we can abstract state storage implementation to get the best out of rocksdb. How can we support such improvement of we take spark package route? Current implementation based on in memory hashmap is not scalable beyond a point. How shall we go about solving it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138 But we need skip send TaskCommitMessage if a task can not commit. https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138 But we need skip send TaskCommitMessage if a task can not commit. https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei commented on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138 But we need skip send TaskCommitMessage if a task can not commit. https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L246 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138 But we need skip send `new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)` if a task can not commit. https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138 But we need skip send `https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241` if a task can not commit. https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138 But we should skip send `new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)` if a task can not commit. https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Udbhav30 commented on a change in pull request #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory
Udbhav30 commented on a change in pull request #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory URL: https://github.com/apache/spark/pull/25398#discussion_r333874882 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -1396,6 +1396,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { compressed = false, properties = rowStorage.properties ++ fileStorage.properties) -(ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER)) +val fileFormat = extractFileFormat(fileStorage.serde) +(ctx.LOCAL != null, storage, Some(fileFormat)) + } + + private def extractFileFormat(serde: Option[String]): String = { Review comment: @HyukjinKwon, i can use `USING file_format` explicitly that would serve the purpose but i thought it is better to fix this and make it inline with `CTAS` behavior which is fixed in this [PR](https://github.com/apache/spark/pull/22514) If you agree to go ahead i can try making changes in analysis/optimizing layer instead of parser as suggested by you. Sure i will follow the template and update the PR description. thanks :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei removed a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei removed a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138 But we should skip send `new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)` if a task can not commit. https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei commented on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540964132 also cc @Clark This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Udbhav30 commented on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory
Udbhav30 commented on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779 > If you only target to fix Hive ser/de to respect compression, why don't you set Hive compression properly? Yes compression can be achieved by setting Hive ser/de or `USING file_format', but as i mentioned this PR is more towards making the behavior inline to `CTAS` and co use `datasource` if it is convertible. Let me know if you have any suggestions :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory
Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779 > If you only target to fix Hive ser/de to respect compression, why don't you set Hive compression properly? Yes compression can be achieved by setting Hive ser/de or `USING file_format', but as i mentioned this PR is more towards making the behavior inline to `CTAS` and co use `datasource` if it is convertible. Let me know if you have any suggestions :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory
Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779 > If you only target to fix Hive ser/de to respect compression, why don't you set Hive compression properly? Yes compression can be achieved by setting Hive ser/de or `USING file_format' , but as i mentioned this PR is more towards making the behavior inline to `CTAS` and co use `datasource` if it is convertible. Let me know if you have any suggestions :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory
Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779 > If you only target to fix Hive ser/de to respect compression, why don't you set Hive compression properly? Yes compression can be achieved by setting Hive ser/de or `USING file_format` , but as i mentioned this PR is more towards making the behavior inline to `CTAS` and co use `datasource` if it is convertible. Let me know if you have any suggestions :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory
Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779 > If you only target to fix Hive ser/de to respect compression, why don't you set Hive compression properly? Yes compression can be achieved by setting Hive ser/de or `USING file_format` , but as i mentioned this PR is more towards making the behavior inline to `CTAS` and to use `datasource` if it is convertible. Let me know if you have any suggestions :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540964132 cc @viirya @cloud-fan also cc @Clark This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Udbhav30 commented on a change in pull request #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory
Udbhav30 commented on a change in pull request #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory URL: https://github.com/apache/spark/pull/25398#discussion_r333874882 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -1396,6 +1396,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { compressed = false, properties = rowStorage.properties ++ fileStorage.properties) -(ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER)) +val fileFormat = extractFileFormat(fileStorage.serde) +(ctx.LOCAL != null, storage, Some(fileFormat)) + } + + private def extractFileFormat(serde: Option[String]): String = { Review comment: @HyukjinKwon, i can use `USING file_format` explicitly that would serve the purpose but i thought it is better to fix this and make it inline with `CTAS` behavior which is fixed in this [PR](https://github.com/apache/spark/pull/22514) If you agree to go ahead i can try making changes in analysis/optimizing layer instead of parser as suggested by you. Sure i will follow the template , I have updated the PR description. thanks :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya opened a new pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
viirya opened a new pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087 ### What changes were proposed in this pull request? This PR proposes to add groupByRelationKey API to Dataset. It creates KeyValueGroupedDataset instance using existing relational columns, instead of a typed function in groupByKey API. Because it leverages existing columns, it can use existing data partition, if any, when doing operations like cogroup. ### Why are the changes needed? Currently if users want to do cogroup on DataFrames, there is no good way to do except for KeyValueGroupedDataset. KeyValueGroupedDataset ignores existing data partition if any. That is a problem. ```scala // df1 and df2 are certainly partitioned and sorted. val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c") .repartition($"a", $"b").sortWithinPartitions("a", "b") val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c") .repartition($"a", $"b").sortWithinPartitions("a", "b") ``` ```scala // This groupByRelationKey won't unnecessarily repartition the data val df3 = df1.groupByRelationKey("a", "b") .cogroup(df2.groupByRelationKey("a", "b")) { case (key, data1, data2) => data1.zip(data2).map { p => p._1.getInt(2) + p._2.getInt(2) } } ``` ``` == Physical Plan == *(5) SerializeFromObject [input[0, int, false] AS value#11206] +- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4888/206084072@4601674e, createexternalrow(a#11172, b#11173, StructField(a,IntegerType,false), StructField(b,IntegerTy pe,false)), createexternalrow(a#11172, b#11173, c#11174, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexterna lrow(a#11188, b#11189, c#11190, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [a#11172, b#11173], [a#11188, b#11189] , [a#11172, b#11173, c#11174], [a#11188, b#11189, c#11190], obj#11205: int :- *(2) Sort [a#11172 ASC NULLS FIRST, b#11173 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#11172, b#11173, 5), false, [id=#10174] : +- *(1) Project [_1#11165 AS a#11172, _2#11166 AS b#11173, _3#11167 AS c#11174] :+- *(1) LocalTableScan [_1#11165, _2#11166, _3#11167] +- *(4) Sort [a#11188 ASC NULLS FIRST, b#11189 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#11188, b#11189, 5), false, [id=#10179] +- *(3) Project [_1#11181 AS a#11188, _2#11182 AS b#11189, _3#11183 AS c#11190] +- *(3) LocalTableScan [_1#11181, _2#11182, _3#11183] ``` ```scala // Current approach creates additional AppendColumns and repartition data again df1.groupByKey(r => r.getInt(0)).cogroup(df2.groupByKey(r => r.getInt(0))) { case (key, data1, data2) => data1.zip(data2).map { p => p._1.getInt(2) + p._2.getInt(2) } } ``` ``` == Physical Plan == *(7) SerializeFromObject [input[0, int, false] AS value#11216] +- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4892/905560656@19f7e6c5, value#11211: int, createexternalrow(a#11172, b#11173, c#11174, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11188, b#11189, c#11190, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [value#11211], [value#11213], [a#11172, b#11173, c#11174], [a#11188, b#11189, c#11190], obj#11215: int :- *(3) Sort [value#11211 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(value#11211, 5), true, [id=#10442] : +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4889/2021090091@6396e053, createexternalrow(a#11172, b#11173, c#11174, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11211] :+- *(2) Sort [a#11172 ASC NULLS FIRST, b#11173 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#11172, b#11173, 5), false, [id=#10437] : +- *(1) Project [_1#11165 AS a#11172, _2#11166 AS b#11173, _3#11167 AS c#11174] : +- *(1) LocalTableScan [_1#11165, _2#11166, _3#11167] +- *(6) Sort [value#11213 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(value#11213, 5), true, [id=#10452] +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4891/1736834504@798dbf14, createexternalrow(a#11
[GitHub] [spark] Mats-SX commented on a change in pull request #24851: [SPARK-27303][GRAPH] Add Spark Graph API
Mats-SX commented on a change in pull request #24851: [SPARK-27303][GRAPH] Add Spark Graph API URL: https://github.com/apache/spark/pull/24851#discussion_r333893149 ## File path: graph/api/src/main/scala/org/apache/spark/graph/api/RelationshipFrameBuilder.scala ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graph.api + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.DataFrame + +/** + * Interface used to build a [[RelationshipFrame]]. + * + * @param df DataFrame containing a single relationship in each row + * @since 3.0.0 + */ +final class RelationshipFrameBuilder(val df: DataFrame) { Review comment: Well, almost ;) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Mats-SX commented on a change in pull request #24851: [SPARK-27303][GRAPH] Add Spark Graph API
Mats-SX commented on a change in pull request #24851: [SPARK-27303][GRAPH] Add Spark Graph API URL: https://github.com/apache/spark/pull/24851#discussion_r333893149 ## File path: graph/api/src/main/scala/org/apache/spark/graph/api/RelationshipFrameBuilder.scala ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graph.api + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.DataFrame + +/** + * Interface used to build a [[RelationshipFrame]]. + * + * @param df DataFrame containing a single relationship in each row + * @since 3.0.0 + */ +final class RelationshipFrameBuilder(val df: DataFrame) { Review comment: Well, almost ;) but I get it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei commented on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545 Oh, It seems that this issue is related with https://github.com/apache/spark/pull/24142. I just read the comments of that, it seems that there are some risk that cause duplicate result. @ajithme @vanzin @LantaoJin How about using this method to name a task file only when dynamicPartitionOverwrite is true? Because for dynamic partition overwrite, it will keep a partition set, which would not cause duplicate result. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] merrily01 opened a new pull request #26088: [SPARK-29436][K8S] Support executor for selecting scheduler through scheduler name in the case of k8s multi-scheduler scenario
merrily01 opened a new pull request #26088: [SPARK-29436][K8S] Support executor for selecting scheduler through scheduler name in the case of k8s multi-scheduler scenario URL: https://github.com/apache/spark/pull/26088 ### What changes were proposed in this pull request? Support executor for selecting scheduler through scheduler name in the case of k8s multi-scheduler scenario. ### Why are the changes needed? If there is no such function, spark can not support the case of k8s multi-scheduler scenario. ### Does this PR introduce any user-facing change? Yes, users can add scheduler name through configuration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin
yaooqinn commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin URL: https://github.com/apache/spark/pull/25977#discussion_r333897470 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala ## @@ -414,7 +415,7 @@ private[spark] object HiveUtils extends Logging { hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, -isolationOn = true, +isolationOn = !isCliSessionState(), Review comment: IMO, SparkSQLCLIDriver already load and initiate hive classes in main classloader, then we shouldn't use !builtin hive jars. Maybe just throw exception when using spark-sql stript with spark.sql.hive.metastore.jars This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin
yaooqinn commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin URL: https://github.com/apache/spark/pull/25977#discussion_r333897470 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala ## @@ -414,7 +415,7 @@ private[spark] object HiveUtils extends Logging { hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, -isolationOn = true, +isolationOn = !isCliSessionState(), Review comment: IMO, SparkSQLCLIDriver already load and initiate hive classes in the main classloader, then we shouldn't use !builtin hive jars. Maybe just throw exception when using spark-sql stript with spark.sql.hive.metastore.jars This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333900101 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) Review comment: We don't need to revert, just return the original plan `plan`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545 Oh, It seems that this issue is related with https://github.com/apache/spark/pull/24142. I just read the comments of that, it seems that there are some risk that cause duplicate result. @ajithme @vanzin @LantaoJin How about using this method to name a task file for the cases below? 1. newTaskTempFile and dynamicPartitionOverwrite is true 2. newTaskTempFileAbsPath Because for these cases, it will keep a filesToMove set, which would not cause duplicate result. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333901952 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { Review comment: It's very likely that we have exchanges in the query plan, what we need to guarantee here is we don't introduce extra exchanges. We should do: ``` def numExchanges... val numExchangesBefore = numExchanges(EnsureRequirements(conf).apply(plan)) val numExchangesAfter = numExchanges(EnsureRequirements(conf).apply(optimizedPlan)) if (numExchangesAfter > numExchangesBefore) { plan } else { optimizedPlan } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333916849 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala ## @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal.create(null, IntegerType), Literal.create(null, IntegerType)), null) } + test("ToNumber") { Review comment: @MaxGekk Thanks for this review. I test the locale on Postgresql but 'L' seems not works. ``` select to_number('USD34234.4350', 'L9.'); // 34234.435 select to_number('EUR34234.4350', 'L9.'); // 34234.435 select to_number('RY34234.4350', 'L9.'); // 34234.435 ``` Although 'RY ' is not a valid locale , the result is the same as the others . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333917844 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala ## @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal.create(null, IntegerType), Literal.create(null, IntegerType)), null) } + test("ToNumber") { Review comment: I think the description of locales is not consistent with the behavior in fact. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333917844 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala ## @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal.create(null, IntegerType), Literal.create(null, IntegerType)), null) } + test("ToNumber") { Review comment: I think the description of locales is not consistent with the behavior in fact. Maybe I should remove the comment for locale. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto commented on issue #25870: [SPARK-27936][K8S] support python deps
skonto commented on issue #25870: [SPARK-27936][K8S] support python deps URL: https://github.com/apache/spark/pull/25870#issuecomment-541001902 @holdenk this is because spark-submit add the resource in sparks.jars: ``` 19/10/11 13:01:30 WARN Utils: Your hostname, universe resolves to a loopback address: 127.0.1.1; using 192.168.2.4 instead (on interface wlp2s0) 19/10/11 13:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Parsed arguments: master k8s://https://10.96.0.1:443 deployMode cluster executorMemory 1G executorCores null totalExecutorCores null propertiesFile null driverMemory1G driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutors2 files null pyFiles null archivesnull mainClass org.apache.spark.examples.SparkPi primaryResource local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar namespark-pi childArgs [100] jarsnull packagesnull packagesExclusions null repositoriesnull verbose true Spark properties used, including those specified through --conf and those from the properties file null: (spark.kubernetes.driver.pod.name,spark-pi-driver) (spark.executor.instances,2) (spark.driver.memory,1G) (spark.executor.memory,1G) (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa) (spark.kubernetes.namespace,spark) (spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12) (spark.kubernetes.container.image.pullPolicy,Always) 19/10/11 13:01:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Main class: org.apache.spark.deploy.k8s.submit.KubernetesClientApplication Arguments: --primary-java-resource local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar --main-class org.apache.spark.examples.SparkPi --arg 100 Spark config: (spark.kubernetes.namespace,spark) (spark.jars,local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar) (spark.app.name,spark-pi) (spark.driver.memory,1G) (spark.executor.instances,2) (spark.submit.pyFiles,) (spark.kubernetes.container.image.pullPolicy,Always) (spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12) (spark.submit.deployMode,cluster) (spark.master,k8s://https://10.96.0.1:443) (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa) (spark.executor.memory,1G) (spark.kubernetes.driver.pod.name,spark-pi-driver) Classpath elements: ``` So since the PR [here](https://github.com/apache/spark/pull/23546) manages that property we dont need to do double work. For python its different. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps
skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps URL: https://github.com/apache/spark/pull/25870#issuecomment-541001902 @holdenk this is because spark-submit adds the resource to the `sparks.jars` property by default, check bellow: ``` 19/10/11 13:01:30 WARN Utils: Your hostname, universe resolves to a loopback address: 127.0.1.1; using 192.168.2.4 instead (on interface wlp2s0) 19/10/11 13:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Parsed arguments: master k8s://https://10.96.0.1:443 deployMode cluster executorMemory 1G executorCores null totalExecutorCores null propertiesFile null driverMemory1G driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutors2 files null pyFiles null archivesnull mainClass org.apache.spark.examples.SparkPi primaryResource local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar namespark-pi childArgs [100] jarsnull packagesnull packagesExclusions null repositoriesnull verbose true Spark properties used, including those specified through --conf and those from the properties file null: (spark.kubernetes.driver.pod.name,spark-pi-driver) (spark.executor.instances,2) (spark.driver.memory,1G) (spark.executor.memory,1G) (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa) (spark.kubernetes.namespace,spark) (spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12) (spark.kubernetes.container.image.pullPolicy,Always) 19/10/11 13:01:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Main class: org.apache.spark.deploy.k8s.submit.KubernetesClientApplication Arguments: --primary-java-resource local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar --main-class org.apache.spark.examples.SparkPi --arg 100 Spark config: (spark.kubernetes.namespace,spark) (spark.jars,local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar) (spark.app.name,spark-pi) (spark.driver.memory,1G) (spark.executor.instances,2) (spark.submit.pyFiles,) (spark.kubernetes.container.image.pullPolicy,Always) (spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12) (spark.submit.deployMode,cluster) (spark.master,k8s://https://10.96.0.1:443) (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa) (spark.executor.memory,1G) (spark.kubernetes.driver.pod.name,spark-pi-driver) Classpath elements: ``` So since the PR [here](https://github.com/apache/spark/pull/23546) manages that property we dont need to do double work. For python its different. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps
skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps URL: https://github.com/apache/spark/pull/25870#issuecomment-541001902 @holdenk this is because spark-submit adds the resource to the `sparks.jars` property by default, check bellow: ``` 19/10/11 13:01:30 WARN Utils: Your hostname, universe resolves to a loopback address: 127.0.1.1; using 192.168.2.4 instead (on interface wlp2s0) 19/10/11 13:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Parsed arguments: master k8s://https://10.96.0.1:443 deployMode cluster executorMemory 1G executorCores null totalExecutorCores null propertiesFile null driverMemory1G driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutors2 files null pyFiles null archivesnull mainClass org.apache.spark.examples.SparkPi primaryResource local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar namespark-pi childArgs [100] jarsnull packagesnull packagesExclusions null repositoriesnull verbose true Spark properties used, including those specified through --conf and those from the properties file null: (spark.kubernetes.driver.pod.name,spark-pi-driver) (spark.executor.instances,2) (spark.driver.memory,1G) (spark.executor.memory,1G) (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa) (spark.kubernetes.namespace,spark) (spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12) (spark.kubernetes.container.image.pullPolicy,Always) 19/10/11 13:01:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Main class: org.apache.spark.deploy.k8s.submit.KubernetesClientApplication Arguments: --primary-java-resource local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar --main-class org.apache.spark.examples.SparkPi --arg 100 Spark config: (spark.kubernetes.namespace,spark) (spark.jars,local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar) (spark.app.name,spark-pi) (spark.driver.memory,1G) (spark.executor.instances,2) (spark.submit.pyFiles,) (spark.kubernetes.container.image.pullPolicy,Always) (spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12) (spark.submit.deployMode,cluster) (spark.master,k8s://https://10.96.0.1:443) (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa) (spark.executor.memory,1G) (spark.kubernetes.driver.pod.name,spark-pi-driver) Classpath elements: ``` So since the PR [here](https://github.com/apache/spark/pull/23546) uploads whatever is needed from `spark.jars` we dont need to do double work. For python its different. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps
skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps URL: https://github.com/apache/spark/pull/25870#issuecomment-541001902 @holdenk this is because spark-submit adds the resource to the `sparks.jars` property by default, check bellow: ``` 19/10/11 13:01:30 WARN Utils: Your hostname, universe resolves to a loopback address: 127.0.1.1; using 192.168.2.4 instead (on interface wlp2s0) 19/10/11 13:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Parsed arguments: master k8s://https://10.96.0.1:443 deployMode cluster executorMemory 1G executorCores null totalExecutorCores null propertiesFile null driverMemory1G driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutors2 files null pyFiles null archivesnull mainClass org.apache.spark.examples.SparkPi primaryResource local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar namespark-pi childArgs [100] jarsnull packagesnull packagesExclusions null repositoriesnull verbose true Spark properties used, including those specified through --conf and those from the properties file null: (spark.kubernetes.driver.pod.name,spark-pi-driver) (spark.executor.instances,2) (spark.driver.memory,1G) (spark.executor.memory,1G) (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa) (spark.kubernetes.namespace,spark) (spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12) (spark.kubernetes.container.image.pullPolicy,Always) 19/10/11 13:01:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Main class: org.apache.spark.deploy.k8s.submit.KubernetesClientApplication Arguments: --primary-java-resource local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar --main-class org.apache.spark.examples.SparkPi --arg 100 Spark config: (spark.kubernetes.namespace,spark) (spark.jars,local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar) (spark.app.name,spark-pi) (spark.driver.memory,1G) (spark.executor.instances,2) (spark.submit.pyFiles,) (spark.kubernetes.container.image.pullPolicy,Always) (spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12) (spark.submit.deployMode,cluster) (spark.master,k8s://https://10.96.0.1:443) (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa) (spark.executor.memory,1G) (spark.kubernetes.driver.pod.name,spark-pi-driver) Classpath elements: ``` So since the PR [here](https://github.com/apache/spark/pull/23546) uploads whatever is needed from `spark.jars` we dont need to do double work. For python its different as technically the main resource is not a jar. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545 Oh, It seems that this issue is related with https://github.com/apache/spark/pull/24142. I just read the comments of that, it seems that there are some risk that cause duplicate result. @ajithme @vanzin @LantaoJin How about using this method to name a task file for the cases below? 1. newTaskTempFile and dynamicPartitionOverwrite is true 2. newTaskTempFileAbsPath Because for these cases, it will keep a filesToMove set, which would not cause duplicate result. P.S: I doubt whether it would cause duplicate result for non dynamic partition overwrite case. Should outputCommitCoordinator decide which task can commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333926303 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) +} else { + optimizedPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends LeafExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { + +def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = { + val initialPartitioning = stage.plan.asInstanceOf[ShuffleExchangeExec] Review comment: `.asInstanceOf[ShuffleExchangeExec]` is not necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-u
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333926473 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) +} else { + optimizedPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends LeafExecNode { + + override def output: Seq[Attribute] = child.output + + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def outputPartitioning: Partitioning = { + +def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = { Review comment: The method name sounds like it returns a boolean. How about `tryReserveChildPartitioning`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additiona
[GitHub] [spark] sandeep-katta commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin
sandeep-katta commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin URL: https://github.com/apache/spark/pull/25977#discussion_r333926916 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala ## @@ -414,7 +415,7 @@ private[spark] object HiveUtils extends Logging { hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, -isolationOn = true, +isolationOn = !isCliSessionState(), Review comment: there are 2 approach 1.based on `isCliSessionState` Hive class should be loaded ( This PR fix) 2. Throw exception at the start of spark-sql if user used !builtin. By doing this there might be a problem, because currently this problem exists for Derby, if the user uses some other Database there will be a impact This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333927414 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight} +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { + + def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) || + (join.buildSide == BuildRight && join.left.isInstanceOf[LocalShuffleReaderExec]) + } + + def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { +(join.buildSide == BuildLeft && ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) || + (join.buildSide == BuildLeft && join.right.isInstanceOf[LocalShuffleReaderExec]) + } + + def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = { +plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +join.copy(right = join.right.asInstanceOf[LocalShuffleReaderExec].child) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child) +} + } + + override def apply(plan: SparkPlan): SparkPlan = { +if (!conf.optimizedLocalShuffleReaderEnabled) { + return plan +} + +val optimizedPlan = plan.transformDown { + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderRight(join) => +val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec]) +join.copy(right = localReader) + case join: BroadcastHashJoinExec if canUseOrRevertLocalShuffleReaderLeft(join) => +val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec]) +join.copy(left = localReader) +} + +val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan) + +val numExchanges = afterEnsureRequirements.collect { + case e: ShuffleExchangeExec => e +}.length + +if (numExchanges > 0) { + logWarning("OptimizeLocalShuffleReader rule is not applied due" + +" to additional shuffles will be introduced.") + revertLocalShuffleReader(optimizedPlan) +} else { + optimizedPlan +} + } +} + +case class LocalShuffleReaderExec( +child: QueryStageExec) extends LeafExecNode { Review comment: nit: we can put it in the previous line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum opened a new pull request #26089: [SPARK-29423][SQL] lazily initialize StreamingQueryManager in SessionState
wangyum opened a new pull request #26089: [SPARK-29423][SQL] lazily initialize StreamingQueryManager in SessionState URL: https://github.com/apache/spark/pull/26089 ### What changes were proposed in this pull request? This PR makes `SessionState` lazily initialize `StreamingQueryManager` to avoid constrcting `StreamingQueryManager` for each session when connecting to ThriftServer. ### Why are the changes needed? Reduce memory usage. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? manual test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Clarkkkkk opened a new pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution
Clark opened a new pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution URL: https://github.com/apache/spark/pull/26090 ### What changes were proposed in this pull request? When inserting into a partitioned DataSource table (would not reproduced if using a Hive table) with dynamic partition overwrite and speculative execution, attempts of same task will try to write same files. This PR reuse FileOutputCommitter to avoid write collision, and rename files in staging directory to final output directory using the original logic in HadoopMapReduceCommitProtocol#commitJob. ### Why are the changes needed? Task failed is this circumstance. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? This patch is tested by existing tests in org.apache.spark.sql.sources.InsertSuite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333932293 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ## @@ -125,6 +130,10 @@ class AdaptiveQueryExecSuite assert(smj.size == 1) val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.size == 1) + val localReaders = adaptivePlan.collect { +case reader: LocalShuffleReaderExec => reader + } + assert(localReaders.length === 1) Review comment: we can add a method ``` def checkNumLocalShuffleReaders(plan: SparkPlan, expected: Int): Unit = ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333934690 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + + +/** + * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition + * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle partitions + * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, inclusive). + */ +private final class LocalShuffleRowRDDPartition( +val preShufflePartitionIndex: Int, +val startPostShufflePartitionIndex: Int, +val endPostShufflePartitionIndex: Int) extends Partition { + override val index: Int = preShufflePartitionIndex +} + +/** + * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used + * in Spark SQL adaptive execution when a shuffle join is converted to broadcast join at runtime + * because the map output of one input table is small enough for broadcast. This RDD represents the + * data of another input table of the join that reads from shuffle. Each partition of the RDD reads + * the whole data from just one mapper output locally. So actually there is no data transferred + * from the network. + + * This RDD takes a [[ShuffleDependency]] (`dependency`). + * + * The `dependency` has the parent RDD of this RDD, which represents the dataset before shuffle + * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs. + * Partition ids should be in the range [0, numPartitions - 1]. + * `dependency.partitioner.numPartitions` is the number of pre-shuffle partitions. (i.e. the number + * of partitions of the map output). The post-shuffle partition number is the same to the parent + * RDD's partition number. + */ +class LocalShuffledRowRDD( + var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + metrics: Map[String, SQLMetric]) + extends RDD[InternalRow](dependency.rdd.context, Nil) { + + private[this] val numReducers = dependency.partitioner.numPartitions + private[this] val numMappers = dependency.rdd.partitions.length + + override def getDependencies: Seq[Dependency[_]] = List(dependency) + + override def getPartitions: Array[Partition] = { + +Array.tabulate[Partition](numMappers) { i => + new LocalShuffleRowRDDPartition(i, 0, numReducers) +} + } + + override def getPreferredLocations(partition: Partition): Seq[String] = { +val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] +val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] Review comment: why not just call `dependency`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333934898 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + + +/** + * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition + * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle partitions + * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, inclusive). + */ +private final class LocalShuffleRowRDDPartition( Review comment: `ShuffleRow` -> `ShuffledRow` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333935255 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + + +/** + * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition + * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle partitions + * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, inclusive). + */ +private final class LocalShuffleRowRDDPartition( +val preShufflePartitionIndex: Int, +val startPostShufflePartitionIndex: Int, Review comment: this is not used any where. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server
turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server URL: https://github.com/apache/spark/pull/25797#discussion_r333935746 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -561,7 +555,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .last(newLastScanTime - 1) .asScala .toList - stale.foreach { log => + stale.filterNot(isProcessing(_)).foreach { log => Review comment: Sorry for late reply, I was on my National Day holiday for past several days. > Instead of cleaning that log, it will instead be skipped until the next time the cleaner task run, which may be quote a long time later. > but we need to also retry cleaning up these logs sooner, at least sooner than the interval of cleaner. It sound a little complex, should we clean these logs with half of interval of cleaner later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf commented on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf commented on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#issuecomment-541016870 Thank you for creating a MR so fast after my submitting the jira ticket! 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server
turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server URL: https://github.com/apache/spark/pull/25797#discussion_r333935746 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -561,7 +555,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .last(newLastScanTime - 1) .asScala .toList - stale.foreach { log => + stale.filterNot(isProcessing(_)).foreach { log => Review comment: Sorry for late reply, I was on my National Day holiday for past several days. > Instead of cleaning that log, it will instead be skipped until the next time the cleaner task run, which may be quote a long time later. > but we need to also retry cleaning up these logs sooner, at least sooner than the interval of cleaner. It sound a little complex, should we clean these logs with half of interval of cleaner later? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server
turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server URL: https://github.com/apache/spark/pull/25797#discussion_r333935746 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -561,7 +555,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .last(newLastScanTime - 1) .asScala .toList - stale.foreach { log => + stale.filterNot(isProcessing(_)).foreach { log => Review comment: Sorry for late reply, I was on my National Day holiday for past several days. > Instead of cleaning that log, it will instead be skipped until the next time the cleaner task run, which may be quote a long time later. > but we need to also retry cleaning up these logs sooner, at least sooner than the interval of cleaner. It sound a little complex, should we clean these logs with half interval of cleaner later? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333938629 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala ## @@ -2267,6 +2267,215 @@ case class FormatNumber(x: Expression, d: Expression) override def prettyName: String = "format_number" } +/** + * A function that converts string to numeric. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ +_FUNC_(strExpr, patternExpr) - Convert a string to a number based on the pattern. +The pattern can consist of the following characters: + '9': digit position (can be dropped if insignificant) + '0': digit position (will not be dropped, even if insignificant) + '.': decimal point (only allowed once) + ',': group (thousands) separator + 'S': sign anchored to number (uses locale) + 'L': currency symbol (uses locale) + 'D': decimal point (uses locale) + 'G': group separator (uses locale) + """, + examples = """ +Examples: + > SELECT _FUNC_('4540', '999'); + 454 + > SELECT _FUNC_('454.00', '000D00'); + 454 + > SELECT _FUNC_('12,454.8-', '99G999D9S'); + -12454.8 + > SELECT _FUNC_('CNY234234.4350', 'L99.'); + 234234.435 + """) +// scalastyle:on line.size.limit +case class ToNumber(strExpr: Expression, patternExpr: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + // scalastyle:off caselocale + private lazy val patternStr = patternExpr.eval().asInstanceOf[UTF8String].toUpperCase.toString Review comment: OK. A good suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#discussion_r333938918 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ## @@ -2203,4 +2203,20 @@ class DataFrameSuite extends QueryTest with SharedSparkSession { |*(1) Range (0, 10, step=1, splits=2)""".stripMargin)) } } + + test("groupByRelationKey") { Review comment: Tests should be updated according to add typed datasets as well if the above changes are implemented This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#discussion_r333936540 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ## @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql]( withGroupingKey.newColumns) } + /** + * Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row of given + * columns. + * + * @group typedrel + * @since 3.0.0 + */ + @scala.annotation.varargs + def groupByRelationKey( + col1: String, + cols: String*): KeyValueGroupedDataset[Row, Row] = { +val colNames: Seq[String] = col1 +: cols +val keyAttrs = colNames.map(colName => resolve(colName).toAttribute) +val keySchema = StructType.fromAttributes(keyAttrs) +val keyEncoder = RowEncoder(keySchema) +val valEncoder = RowEncoder(schema) Review comment: I don't see why we couldn't keep the encoder that is already present in the Dataset. With this approach we lose that type information. I'd prefer doing this: ``` def groupByRelationKey( col1: String, cols: String*): KeyValueGroupedDataset[Row, T] = { val colNames: Seq[String] = col1 +: cols val keyAttrs = colNames.map(colName => resolve(colName).toAttribute) val keySchema = StructType.fromAttributes(keyAttrs) val keyEncoder = RowEncoder(keySchema) new KeyValueGroupedDataset( keyEncoder, encoder, queryExecution, logicalPlan.output, keyAttrs) } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#discussion_r333938397 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ## @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql]( withGroupingKey.newColumns) } + /** + * Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row of given + * columns. + * + * @group typedrel + * @since 3.0.0 + */ + @scala.annotation.varargs + def groupByRelationKey( + col1: String, + cols: String*): KeyValueGroupedDataset[Row, Row] = { +val colNames: Seq[String] = col1 +: cols +val keyAttrs = colNames.map(colName => resolve(colName).toAttribute) +val keySchema = StructType.fromAttributes(keyAttrs) +val keyEncoder = RowEncoder(keySchema) +val valEncoder = RowEncoder(schema) Review comment: Then we can then leverage the `keyAs[L : Encoder]` method on `KeyValueGroupedDataset` to get full type information so that entirely typed `cogroup` would be possible (which is arguably the more userfriendly approach). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#discussion_r333936540 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ## @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql]( withGroupingKey.newColumns) } + /** + * Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row of given + * columns. + * + * @group typedrel + * @since 3.0.0 + */ + @scala.annotation.varargs + def groupByRelationKey( + col1: String, + cols: String*): KeyValueGroupedDataset[Row, Row] = { +val colNames: Seq[String] = col1 +: cols +val keyAttrs = colNames.map(colName => resolve(colName).toAttribute) +val keySchema = StructType.fromAttributes(keyAttrs) +val keyEncoder = RowEncoder(keySchema) +val valEncoder = RowEncoder(schema) Review comment: I don't see why we couldn't keep the encoder that is already present in the Dataset. With the current approach we lose that type information. I'd prefer doing this: ``` def groupByRelationKey( col1: String, cols: String*): KeyValueGroupedDataset[Row, T] = { val colNames: Seq[String] = col1 +: cols val keyAttrs = colNames.map(colName => resolve(colName).toAttribute) val keySchema = StructType.fromAttributes(keyAttrs) val keyEncoder = RowEncoder(keySchema) new KeyValueGroupedDataset( keyEncoder, encoder, queryExecution, logicalPlan.output, keyAttrs) } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333939935 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -388,6 +388,14 @@ object SQLConf { "must be a positive integer.") .createOptional + val OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED = Review comment: `OPTIMIZED` -> `OPTIMIZE` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333940060 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2137,6 +2145,8 @@ class SQLConf extends Serializable with Logging { def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions) + def optimizedLocalShuffleReaderEnabled: Boolean = getConf(OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED) Review comment: It's only called once, we don't need to create a method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#discussion_r333938918 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ## @@ -2203,4 +2203,20 @@ class DataFrameSuite extends QueryTest with SharedSparkSession { |*(1) Range (0, 10, step=1, splits=2)""".stripMargin)) } } + + test("groupByRelationKey") { Review comment: Tests should be updated according to add typed datasets as well if my previous change suggestion is implemented This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333940347 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala ## @@ -2267,6 +2267,215 @@ case class FormatNumber(x: Expression, d: Expression) override def prettyName: String = "format_number" } +/** + * A function that converts string to numeric. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ +_FUNC_(strExpr, patternExpr) - Convert a string to a number based on the pattern. +The pattern can consist of the following characters: + '9': digit position (can be dropped if insignificant) + '0': digit position (will not be dropped, even if insignificant) + '.': decimal point (only allowed once) + ',': group (thousands) separator + 'S': sign anchored to number (uses locale) + 'L': currency symbol (uses locale) + 'D': decimal point (uses locale) + 'G': group separator (uses locale) + """, + examples = """ +Examples: + > SELECT _FUNC_('4540', '999'); + 454 + > SELECT _FUNC_('454.00', '000D00'); + 454 + > SELECT _FUNC_('12,454.8-', '99G999D9S'); + -12454.8 + > SELECT _FUNC_('CNY234234.4350', 'L99.'); + 234234.435 + """) +// scalastyle:on line.size.limit +case class ToNumber(strExpr: Expression, patternExpr: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + // scalastyle:off caselocale + private lazy val patternStr = patternExpr.eval().asInstanceOf[UTF8String].toUpperCase.toString + // scalastyle:on caselocale + + override def left: Expression = strExpr + override def right: Expression = patternExpr + override def dataType: DataType = StringType + override def inputTypes: Seq[DataType] = Seq(StringType, StringType) + + override def checkInputDataTypes(): TypeCheckResult = { +def checkDecimalPointNum(c: Char): Boolean = { + c == '.' || c.toUpper == 'D' +} + +val inputTypeCheck = super.checkInputDataTypes() +if(inputTypeCheck.isSuccess) { + if (patternStr.count(checkDecimalPointNum(_)) > 1) { Review comment: OK. I learned it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333940553 ## File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala ## @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from mapId. + * Called on executors by reduce tasks. + */ + def getMapReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, Review comment: now the local shuffle reader always read all the shuffle blocks of one mapper. Let's add the above 2 parameters later when we going to use them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333940553 ## File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala ## @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from mapId. + * Called on executors by reduce tasks. + */ + def getMapReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, Review comment: now the local shuffle reader always read all the shuffle blocks of one mapper. Let's add the above 2 parameters later when we going to use them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333941162 ## File path: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ## @@ -36,19 +36,35 @@ private[spark] class BlockStoreShuffleReader[K, C]( readMetrics: ShuffleReadMetricsReporter, serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, -mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) +mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker, +mapId: Option[Int] = None) extends ShuffleReader[K, C] with Logging { private val dep = handle.dependency /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { +val blocksByAddress = (mapId) match { Review comment: `(mapId)` -> `mapId` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei commented on a change in pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution
turboFei commented on a change in pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution URL: https://github.com/apache/spark/pull/26090#discussion_r333941272 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -89,7 +92,7 @@ class HadoopMapReduceCommitProtocol( * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - private def stagingDir = new Path(path, ".spark-staging-" + jobId) + private def stagingDir = new Path(stagingPath.getOrElse(path)) Review comment: AFAIK, this staging dir is used to deal with files with absolute output path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. But in this PR, it change the behavior for absolute output path. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333942317 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala ## @@ -2267,6 +2267,215 @@ case class FormatNumber(x: Expression, d: Expression) override def prettyName: String = "format_number" } +/** + * A function that converts string to numeric. Review comment: I will preserve this line and change the `usage` part as `Convert `strExpr` to a number based on the `patternExpr`.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333942529 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala ## @@ -2267,6 +2267,215 @@ case class FormatNumber(x: Expression, d: Expression) override def prettyName: String = "format_number" } +/** + * A function that converts string to numeric. + */ +// scalastyle:off line.size.limit Review comment: Thanks, I will follow your suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333943188 ## File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala ## @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager { context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from mapId. + * Called on executors by reduce tasks. + */ + def getMapReader[K, C]( Review comment: instead of creating a new API, how about we set the map id in `TaskContext.getLocalProperties`, and still call the existing `getReader` method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333943901 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -669,6 +685,34 @@ private[spark] class MapOutputTrackerMaster( None } + /** + * Return the locations where the Mapper(s) ran. The locations each includes both a host and an + * executor id on that host. + * + * @param dep shuffle dependency object + * @param startMapId the start map id + * @param endMapId the end map id Review comment: We shouldn't add code that only for future use. For now we only need a single `mapId`, IIUC. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333944281 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala ## @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal.create(null, IntegerType), Literal.create(null, IntegerType)), null) } + test("ToNumber") { +// Test '0' and '9' +checkEvaluation(ToNumber(Literal("454"), Literal("")), "") +checkEvaluation(ToNumber(Literal("454"), Literal("9")), "4") +checkEvaluation(ToNumber(Literal("454"), Literal("99")), "45") +checkEvaluation(ToNumber(Literal("454"), Literal("999")), "454") +checkEvaluation(ToNumber(Literal("054"), Literal("999")), "54") +checkEvaluation(ToNumber(Literal("404"), Literal("999")), "404") +checkEvaluation(ToNumber(Literal("450"), Literal("999")), "450") +checkEvaluation(ToNumber(Literal("0454"), Literal("999")), "45") +checkEvaluation(ToNumber(Literal("4540"), Literal("999")), "454") +checkEvaluation(ToNumber(Literal("454"), Literal("0")), "4") +checkEvaluation(ToNumber(Literal("454"), Literal("00")), "45") +checkEvaluation(ToNumber(Literal("454"), Literal("000")), "454") +checkEvaluation(ToNumber(Literal("054"), Literal("000")), "54") +checkEvaluation(ToNumber(Literal("404"), Literal("000")), "404") +checkEvaluation(ToNumber(Literal("450"), Literal("000")), "450") +checkEvaluation(ToNumber(Literal("0454"), Literal("000")), "45") +checkEvaluation(ToNumber(Literal("4540"), Literal("000")), "454") +// Test '.' and 'D' +checkEvaluation(ToNumber(Literal("454.0"), Literal("999.9")), "454") +checkEvaluation(ToNumber(Literal("454.0"), Literal("000.0")), "454") +checkEvaluation(ToNumber(Literal("454.0"), Literal("999D9")), "454") +checkEvaluation(ToNumber(Literal("454.0"), Literal("000D0")), "454") +checkEvaluation(ToNumber(Literal("454.00"), Literal("000D00")), "454") +checkEvaluation(ToNumber(Literal("454.3"), Literal("999.9")), "454.3") +checkEvaluation(ToNumber(Literal("454.3"), Literal("000.0")), "454.3") +checkEvaluation(ToNumber(Literal("454.3"), Literal("999D9")), "454.3") +checkEvaluation(ToNumber(Literal("454.3"), Literal("000D0")), "454.3") +// Test ',' and 'G' +checkEvaluation(ToNumber(Literal("12,454"), Literal("99,999")), "12454") +checkEvaluation(ToNumber(Literal("12,454"), Literal("00,000")), "12454") +checkEvaluation(ToNumber(Literal("12,454"), Literal("99G999")), "12454") +checkEvaluation(ToNumber(Literal("12,454"), Literal("00G000")), "12454") +checkEvaluation(ToNumber(Literal("12,454,367"), Literal("99,999,999")), "12454367") +checkEvaluation(ToNumber(Literal("12,454,367"), Literal("00,000,000")), "12454367") +checkEvaluation(ToNumber(Literal("12,454,367"), Literal("99G999G999")), "12454367") +checkEvaluation(ToNumber(Literal("12,454,367"), Literal("00G000G000")), "12454367") +// Test 'S' +checkEvaluation(ToNumber(Literal("454-"), Literal("999S")), "-454") +checkEvaluation(ToNumber(Literal("454-"), Literal("000S")), "-454") +checkEvaluation(ToNumber(Literal("12,454.8-"), Literal("99G999D9S")), "-12454.8") +checkEvaluation(ToNumber(Literal("1-2,454.8"), Literal("9S9,999D9")), "-12454.8") +checkEvaluation(ToNumber(Literal("00,454.8-"), Literal("99G999.9S")), "-454.8") +// Test 'L' +checkEvaluation(ToNumber(Literal("CNY234234.4350"), Literal("L99.")), "234234.435") +checkEvaluation(ToNumber(Literal("RMB34234.4350"), Literal("L9.")), "34234.435") +checkEvaluation(ToNumber(Literal("RY34234.4350"), Literal("L9.")), "34234.435") +checkEvaluation(ToNumber(Literal("R34234.4350"), Literal("L9.")), "34234.435") + +ToNumber(Literal("454.3.2"), Literal("999D9D9")).checkInputDataTypes() match { Review comment: > Here you check only one negative tests. Could you add a little bit more negative tests. OK. I will add more. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333916849 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala ## @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal.create(null, IntegerType), Literal.create(null, IntegerType)), null) } + test("ToNumber") { Review comment: @MaxGekk Thanks for this review. I test the locale on `Postgresql` but 'L' seems not works. ``` select to_number('USD34234.4350', 'L9.'); // 34234.435 select to_number('EUR34234.4350', 'L9.'); // 34234.435 select to_number('RY34234.4350', 'L9.'); // 34234.435 ``` Although 'RY ' is not a valid locale , the result is the same as the others . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number. URL: https://github.com/apache/spark/pull/25963#discussion_r333917844 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala ## @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal.create(null, IntegerType), Literal.create(null, IntegerType)), null) } + test("ToNumber") { Review comment: I think the description of locales is not consistent with the behavior in fact. Maybe I should remove the comment for locale as it was a bug of `Postgresql`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution URL: https://github.com/apache/spark/pull/25295#discussion_r333946016 ## File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ## @@ -749,6 +818,26 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } + override def getMapSizesByExecutorId( + shuffleId: Int, + startPartition: Int, + endPartition: Int, + mapId: Int, + useOldFetchProtocol: Boolean) : Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { +logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" + + s"partitions $startPartition-$endPartition") +val statuses = getStatuses(shuffleId) Review comment: Since we need to get all the shuffle status anyway, we can call the existing `convertMapStatuses`, and do an extra filter to only collect the blocks whose mapIndex is what we want. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Clarkkkkk commented on a change in pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution
Clark commented on a change in pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution URL: https://github.com/apache/spark/pull/26090#discussion_r333954828 ## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ## @@ -89,7 +92,7 @@ class HadoopMapReduceCommitProtocol( * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - private def stagingDir = new Path(path, ".spark-staging-" + jobId) + private def stagingDir = new Path(stagingPath.getOrElse(path)) Review comment: You are right, stagingDir cannot be used here, as it might get deleted during abortJob. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#discussion_r333938397 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ## @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql]( withGroupingKey.newColumns) } + /** + * Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row of given + * columns. + * + * @group typedrel + * @since 3.0.0 + */ + @scala.annotation.varargs + def groupByRelationKey( + col1: String, + cols: String*): KeyValueGroupedDataset[Row, Row] = { +val colNames: Seq[String] = col1 +: cols +val keyAttrs = colNames.map(colName => resolve(colName).toAttribute) +val keySchema = StructType.fromAttributes(keyAttrs) +val keyEncoder = RowEncoder(keySchema) +val valEncoder = RowEncoder(schema) Review comment: Then the user can leverage the `keyAs[L : Encoder]` method on `KeyValueGroupedDataset` to get full type information so that entirely typed `cogroup` would be possible (which is arguably the more userfriendly approach). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #25955: [SPARK-29277][SQL] Add early DSv2 filter and projection pushdown
cloud-fan commented on a change in pull request #25955: [SPARK-29277][SQL] Add early DSv2 filter and projection pushdown URL: https://github.com/apache/spark/pull/25955#discussion_r333962848 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ## @@ -55,7 +56,52 @@ case class DataSourceV2Relation( } override def computeStats(): Statistics = { -val scan = newScanBuilder().build() +if (Utils.isTesting) { + // when testing, throw an exception if this computeStats method is called because stats should + // not be accessed before pushing the projection and filters to create a scan. otherwise, the + // stats are not accurate because they are based on a full table scan of all columns. + throw new UnsupportedOperationException( +s"BUG: computeStats called before pushdown on DSv2 relation: $name") +} else { + // when not testing, return stats because bad stats are better than failing a query + newScanBuilder() match { +case r: SupportsReportStatistics => + val statistics = r.estimateStatistics() + DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes) +case _ => + Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} + } + + override def newInstance(): DataSourceV2Relation = { +copy(output = output.map(_.newInstance())) + } +} + +/** + * A logical plan for a DSv2 table with a scan already created. + * + * This is used in the optimizer to push filters and projection down before conversion to physical + * plan. This ensures that the stats that are used by the optimizer account for the filters and + * projection that will be pushed down. + * + * @param table a DSv2 [[Table]] + * @param scan a DSv2 [[Scan]] + * @param output the output attributes of this relation + */ +case class DataSourceV2ScanRelation( Review comment: Makes sense. But it's still better to reduce the maintenance cost a bit. How about we clearly define the life cycle of a v2 relation as: `UnresolvedRelation` (before analysis) -> `DataSourceV2Relation` (after analysis) -> `DataSourceV2ScanRelation` (after optimization) -> the physical scan node. Then we only need to match one node: 1. if dealing with `analyzedPlan`, match `DataSourceV2Relation` 2. if dealing with `optimizedPlan`, match `DataSourceV2ScanRelation` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] s1ck commented on issue #24851: [SPARK-27303][GRAPH] Add Spark Graph API
s1ck commented on issue #24851: [SPARK-27303][GRAPH] Add Spark Graph API URL: https://github.com/apache/spark/pull/24851#issuecomment-541041551 @dongjoon-hyun Thanks for the extra rounds. We made the `DataFrame` to `Dataset[Row]` changes and added some clarifying docs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf edited a comment on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf edited a comment on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#issuecomment-541016870 Thank you for creating a PR so fast after my submitting the jira ticket! 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hagerf commented on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame
hagerf commented on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame URL: https://github.com/apache/spark/pull/26087#issuecomment-541051711 I added PR with these changes and a new test for the typed datasets. 👍 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time
skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time URL: https://github.com/apache/spark/pull/25609#discussion_r333985107 ## File path: docs/security.md ## @@ -845,8 +845,13 @@ When talking to Hadoop-based services behind Kerberos, it was noted that Spark n so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job: -In all cases you must define the environment variable: `HADOOP_CONF_DIR` or -`spark.kubernetes.hadoop.configMapName.` +In all cases you must define the environment variable: `HADOOP_CONF_DIR` or/and +`spark.kubernetes.hadoop.configMapName`. When only `HADOOP_CONF_DIR` is configured, the hadoop configurations in there +will be used by the client process, the Driver and its Executors. When only `spark.kubernetes.hadoop.configMapName` +is configured, the hadoop configurations will only be used by the Driver and its Executors. If your client process has +extra dependencies to upload to `spark.kubernetes.file.upload.path`, you may need to configure `HADOOP_CONF_DIR` too. +When these two variables are both set, Spark will prefer `spark.kubernetes.hadoop.configMapName` to be mounted on the +Driver/Executor pods. Review comment: The concept looks good to me, @ifilonenko any corner cases? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan opened a new pull request #26091: [SPARK-29439][SQL] DDL commands should not use DataSourceV2Relation
cloud-fan opened a new pull request #26091: [SPARK-29439][SQL] DDL commands should not use DataSourceV2Relation URL: https://github.com/apache/spark/pull/26091 ### What changes were proposed in this pull request? Create a new node `ResolvedV2Table`, and resolve `UnresolvedV2Relation` to this new node instead of `DataSourceV2Relation`. ### Why are the changes needed? `DataSourceV2Relation` is a scan node. It should be used when we want to scan a v2 table. However, the DDL commands do not need to scan a v2 table, they just need a node to hold the v2 table. It's possible that there are rules trying to match `DataSourceV2Relation` and convert it to something else (e.g. a new `DataSourceV2ScanRelation`), for better data scan. Unfortunately doing this will break these DDL commands. It's better to have a separated node to hold the v2 table for DDL commands. ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time
skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time URL: https://github.com/apache/spark/pull/25609#discussion_r333990738 ## File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala ## @@ -62,6 +65,17 @@ class HadoopConfDriverFeatureStepSuite extends SparkFunSuite { assert(hadoopConfMap.getData().keySet().asScala === confFiles) } + test("using hadoop config map ahead of conf dir if both defined") { +val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath())) + .set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap") + +val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) Review comment: nit: remove space This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26091: [SPARK-29439][SQL] DDL commands should not use DataSourceV2Relation
cloud-fan commented on issue #26091: [SPARK-29439][SQL] DDL commands should not use DataSourceV2Relation URL: https://github.com/apache/spark/pull/26091#issuecomment-541064323 @brkyvz @rdblue This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time
skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time URL: https://github.com/apache/spark/pull/25609#discussion_r333990738 ## File path: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala ## @@ -62,6 +65,17 @@ class HadoopConfDriverFeatureStepSuite extends SparkFunSuite { assert(hadoopConfMap.getData().keySet().asScala === confFiles) } + test("using hadoop config map ahead of conf dir if both defined") { +val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath())) + .set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap") + +val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) Review comment: minor: remove space This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #26089: [SPARK-29423][SQL] lazily initialize StreamingQueryManager in SessionState
HeartSaVioR commented on issue #26089: [SPARK-29423][SQL] lazily initialize StreamingQueryManager in SessionState URL: https://github.com/apache/spark/pull/26089#issuecomment-541065149 I expect the code change would work, but it would be even better if you could attach some result of manual tests in description of PR like taking snapshot for sampler page in jvisualvm and locate StreamingQueryListenerBus - current latest master and after applying this patch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] skonto commented on issue #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time
skonto commented on issue #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time URL: https://github.com/apache/spark/pull/25609#issuecomment-541065986 @erikerlandson LGTM, we need an integration test for this @ifilonenko gentle ping. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tgravescs commented on issue #26085: [SPARK-29434] [Core] Improve the MapStatuses Serialization Performance
tgravescs commented on issue #26085: [SPARK-29434] [Core] Improve the MapStatuses Serialization Performance URL: https://github.com/apache/spark/pull/26085#issuecomment-541067929 I haven't looked at the code yet, can you clarify what ops/ms is measuring here? >> For smaller MapStatuses, we find it's 2x faster. Just to verify, this holds true for small jobs, where you have 10 outputs or 100 outputs? Did you try with say 2000 or 5000 blocks? The performance numbers for zstd ops/ms for the 2 step dropped a lot between 5 and 500, whereas the 1 step didn't drop nearly as much, wondering if 1 step becomes faster at some point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26018: [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState
HeartSaVioR commented on a change in pull request #26018: [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState URL: https://github.com/apache/spark/pull/26018#discussion_r333997947 ## File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala ## @@ -355,7 +355,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo // Make sure no other query with same id is active across all sessions val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) - if (activeOption.isDefined) { + if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id) { Review comment: Just curious: are you seeing the actual case where activeOption is None but activeQueries contain such query? I'm not seeing the case, though I don't think adding this would hurt. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite and specified abs path be unique
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite and specified abs path be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545 Oh, It seems that this issue is related with https://github.com/apache/spark/pull/24142. I just read the comments of that, it seems that there are some risk that cause duplicate result. @ajithme @vanzin @LantaoJin How about using this method to name a task file for dynamic partition overwrite only? Because for this cases, it will keep a filesToMove set, which would not cause duplicate result. P.S: I doubt whether it would cause duplicate result for non dynamic partition overwrite case. Should outputCommitCoordinator decide which task can commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545 Oh, It seems that this issue is related with https://github.com/apache/spark/pull/24142. I just read the comments of that, it seems that there are some risk that cause duplicate result. @ajithme @vanzin @LantaoJin How about using this method to name a task file for dynamic partition overwrite only? Because for this case, it will keep a filesToMove set, which would not cause duplicate result. P.S: I doubt whether it would cause duplicate result for non dynamic partition overwrite case. Should outputCommitCoordinator decide which task can commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] nonsleepr commented on a change in pull request #26000: [SPARK-29330][CORE][YARN] Allow users to chose the name of Spark Shuffle service
nonsleepr commented on a change in pull request #26000: [SPARK-29330][CORE][YARN] Allow users to chose the name of Spark Shuffle service URL: https://github.com/apache/spark/pull/26000#discussion_r334009122 ## File path: common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java ## @@ -136,7 +136,11 @@ private DB db; public YarnShuffleService() { -super("spark_shuffle"); +this("spark_shuffle"); + } + + protected YarnShuffleService(String serviceName) { Review comment: The name specified here is actually useful only in tests. YARN's service instantiation logic wouldn't even pass the name of the service used in the config to instantiated service. I guess that's the main reason the names and ports are hardcoded or bound to non-namespaced configuration keys. The way HDP overcomes that is by providing different classpaths with different implementations for different versions of the service (`spark_shuffle` for Spark 1.6.x and `spark2_shuffle` for Spark 2+). The only way I see it's possible to pass different parameters to the same implementation of the service is by providing different configs on the classpath. I will add a comment here stating that the name is actually only used for the tests, but otherwise would always be hardcoded to `spark_shuffle`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on issue #25943: [WIP][SPARK-29261][SQL][CORE] Support recover live entities from KVStore for (SQL)AppStatusListener
Ngone51 commented on issue #25943: [WIP][SPARK-29261][SQL][CORE] Support recover live entities from KVStore for (SQL)AppStatusListener URL: https://github.com/apache/spark/pull/25943#issuecomment-541080685 > So you're saying the KVStore already has enough info, this PR just repopulates the in-memory version of all the LiveEntities? Yeah. The snapshot is used to restore a KVStore. And then, we recover live entities for AppStatusListener/SQLAppStatusListener from that restored KVStore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk opened a new pull request #26092: [SPARK-29440][SQL] Support java.time.Duration as an external type of CalendarIntervalType
MaxGekk opened a new pull request #26092: [SPARK-29440][SQL] Support java.time.Duration as an external type of CalendarIntervalType URL: https://github.com/apache/spark/pull/26092 ### What changes were proposed in this pull request? In the PR, I propose to convert values of the `CalendarIntervalType` Catalyst's type to the `java.time.Duration` values when such values are need outside of Spark, for example in UDF. If an `INTERVAL` values has non-zero `months` field, it is converted to number of seconds assuming `2629746` seconds per months. This average number of seconds per month was given by assuming that the average year of the Gregorian calendar `365.2425` days long (see https://en.wikipedia.org/wiki/Gregorian_calendar): `60 * 60 * 24 * 365.2425` = `31556952.0` = `12 * 2629746`. For example: ```sql scala> val plusDay = udf((i: java.time.Duration) => i.plusDays(1)) plusDay: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$1855/165450258@485996f7,CalendarIntervalType,List(Some(Schema(CalendarIntervalType,true))),None,true,true) scala> val df = spark.sql("SELECT interval 40 minutes as i") df: org.apache.spark.sql.DataFrame = [i: interval] scala> df.show +---+ | i| +---+ |interval 40 minutes| +---+ scala> df.select(plusDay('i)).show(false) +--+ |UDF(i)| +--+ |interval 1 days 40 minutes| +--+ ``` I added an implicit encoder for `java.time.Duration` which allows to create Spark dataframe from an external collections: ```sql scala> Seq(Duration.ofDays(10), Duration.ofHours(10)).toDS.show(false) +---+ |value | +---+ |interval 1 weeks 3 days| |interval 10 hours | +---+ ``` ### Why are the changes needed? This should allow to users: - Write UDF over interval inputs - Use Java 8 libraries for `java.time.Duration` in manipulations on collected values or in UDFs - Create dataframes from a collection of `java.time.Duration` values. ### Does this PR introduce any user-facing change? Yes, currently `collect()` returns not public class `CalendarInterval`: ``` scala> spark.sql("select interval 1 week").collect().apply(0).get(0).isInstanceOf[org.apache.spark.unsafe.types.CalendarInterval] res2: Boolean = true ``` After the changes: ``` scala> spark.sql("select interval 1 week").collect().apply(0).get(0).isInstanceOf[Duration] res8: Boolean = true ``` ### How was this patch tested? - Added new testes to `CatalystTypeConvertersSuite` to check conversion of `CalendarIntervalType` to/from `java.time.Duration` - By `JavaUDFSuite`/ `UDFSuite` to test usage of `Duration` type in Scala/Java UDFs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org