[GitHub] [spark] HyukjinKwon commented on issue #26053: [SPARK-29379][SQL]SHOW FUNCTIONS show '!=', '<>' , 'between', 'case'

2019-10-11 Thread GitBox
HyukjinKwon commented on issue #26053: [SPARK-29379][SQL]SHOW FUNCTIONS  show 
'!=', '<>' , 'between', 'case'
URL: https://github.com/apache/spark/pull/26053#issuecomment-540942383
 
 
   Alright, let's deal with it later separately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yaooqinn commented on issue #26080: [SPARK-29425][SQL] The ownership of a database should be respected

2019-10-11 Thread GitBox
yaooqinn commented on issue #26080: [SPARK-29425][SQL] The ownership of a 
database should be respected
URL: https://github.com/apache/spark/pull/26080#issuecomment-540945353
 
 
   cc @cloud-fan @gatorsmile @wangyum 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26055: [SPARK-29368][SQL][TEST] Port interval.sql

2019-10-11 Thread GitBox
MaxGekk commented on a change in pull request #26055: [SPARK-29368][SQL][TEST] 
Port interval.sql
URL: https://github.com/apache/spark/pull/26055#discussion_r333855323
 
 

 ##
 File path: sql/core/src/test/resources/sql-tests/inputs/postgreSQL/interval.sql
 ##
 @@ -0,0 +1,330 @@
+--
+-- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+--
+--
+-- INTERVAL
+-- 
https://github.com/postgres/postgres/blob/REL_12_STABLE/src/test/regress/sql/interval.sql
+
+-- SET DATESTYLE = 'ISO';
+-- SET IntervalStyle to postgres;
+
+-- check acceptance of "time zone style"
+-- [SPARK-29369] Accept strings without `interval` prefix in casting to 
intervals
+-- [SPARK-29370] Interval strings without explicit unit markings
+-- SELECT INTERVAL '01:00' AS `One hour`;
+-- SELECT INTERVAL '+02:00' AS `Two hours`;
+-- SELECT INTERVAL '-08:00' AS `Eight hours`;
+-- SELECT INTERVAL '-1 +02:03' AS `22 hours ago...`;
+-- SELECT INTERVAL '-1 days +02:03' AS `22 hours ago...`;
+-- [SPARK-29371] Support interval field values with fractional parts
+-- SELECT INTERVAL '1.5 weeks' AS `Ten days twelve hours`;
+-- SELECT INTERVAL '1.5 months' AS `One month 15 days`;
+-- SELECT INTERVAL '10 years -11 month -12 days +13:14' AS `9 years...`;
+
+-- [SPARK-29382] Support the `INTERVAL` type by Parquet datasource
 
 Review comment:
   This is too much generic, from my point of view. If we use parquet 
everywhere in the ported tests, `interval.sql` shouldn't be exclusion. And here 
we have concrete problem is the **parquet** datasource doesn't support writing 
values of the interval type.
   
   > -- [SPARK-29382] Support writing `INTERVAL` type to datasource table
   
   Does it mean that `INTERVAL` should be supported by all builtin datasources?
   
   I will change the title of the JIRA ticket to unblock this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs

2019-10-11 Thread GitBox
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] 
Optimize shuffle fetch of continuous partition IDs
URL: https://github.com/apache/spark/pull/26040#discussion_r333857916
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
 ##
 @@ -65,12 +65,30 @@ class NettyBlockRpcServer(
 
   case fetchShuffleBlocks: FetchShuffleBlocks =>
 val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case 
(mapId, index) =>
-  fetchShuffleBlocks.reduceIds.apply(index).map { reduceId =>
-blockManager.getBlockData(
-  ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))
+  if (!fetchShuffleBlocks.batchFetchEnabled) {
+fetchShuffleBlocks.reduceIds(index).map { reduceId =>
+  blockManager.getBlockData(
+ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))
+}
+  } else {
+val startAndEndId = fetchShuffleBlocks.reduceIds(index)
+if (startAndEndId.length != 2) {
+  throw new IllegalStateException(s"Invalid shuffle fetch request: 
" +
 
 Review comment:
   To be more specific, how about:
   
   `"Invalid shuffle fetch request when batch mode is enabled: " `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs

2019-10-11 Thread GitBox
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] 
Optimize shuffle fetch of continuous partition IDs
URL: https://github.com/apache/spark/pull/26040#discussion_r333843027
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
 ##
 @@ -56,6 +56,18 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Long, 
reduceId: Int) extends Bl
   override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + 
reduceId
 }
 
+// The batch id of continuous shuffle blocks of same mapId in range 
[startReduceId, endReduceId).
+@DeveloperApi
+case class ShuffleBlockBatchId(
+shuffleId: Int,
+mapId: Long,
+startReduceId: Int,
+endReduceId: Int) extends BlockId {
+  override def name: String = {
+"shuffle_" + shuffleId + "_" + mapId + "_" + startReduceId + "_" + 
endReduceId
 
 Review comment:
   Shall we use `"_" + startReduceId + "-" + endReduceId` instead? Since 
`startReduceId` and `endReduceId` belongs to the same semantic group. This also 
gives more space to extend ShuffleBlockBatchId  in the feature.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs

2019-10-11 Thread GitBox
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] 
Optimize shuffle fetch of continuous partition IDs
URL: https://github.com/apache/spark/pull/26040#discussion_r333848114
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -335,6 +341,51 @@ final class ShuffleBlockFetcherIterator(
 remoteRequests
   }
 
+  private[this] def mergeContinuousShuffleBlockIdsIfNeeded(
+  blocks: ArrayBuffer[FetchBlockInfo]): ArrayBuffer[FetchBlockInfo] = {
+
+def mergeFetchBlockInfo(toBeMerged: ArrayBuffer[FetchBlockInfo]): 
FetchBlockInfo = {
+  val startBlockId = toBeMerged.head.blockId.asInstanceOf[ShuffleBlockId]
+  FetchBlockInfo(
+ShuffleBlockBatchId(
+  startBlockId.shuffleId,
+  startBlockId.mapId,
+  startBlockId.reduceId,
+  toBeMerged.last.blockId.asInstanceOf[ShuffleBlockId].reduceId + 1),
+toBeMerged.map(_.size).sum,
+toBeMerged.head.mapIndex)
+}
+
+val result = if (fetchContinuousShuffleBlocksInBatch) {
+  var curBlocks = new ArrayBuffer[FetchBlockInfo]
+  val mergedBlockInfo = new ArrayBuffer[FetchBlockInfo]
+  val iter = blocks.iterator
+
+  while (iter.hasNext) {
+val info = iter.next()
+val curBlockId = info.blockId.asInstanceOf[ShuffleBlockId]
+if (curBlocks.isEmpty) {
+  curBlocks += info
+} else {
+  if (curBlockId.mapId != 
curBlocks.head.blockId.asInstanceOf[ShuffleBlockId].mapId) {
 
 Review comment:
   How about we keep tracking with `preMapId`, `startReduceId`, `endReduceId` 
and `mergedBlockSize` and avoid using `curBlocks: ArrayBuffer[FetchBlockInfo]` 
since we don't need all the info in curBlocks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs

2019-10-11 Thread GitBox
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] 
Optimize shuffle fetch of continuous partition IDs
URL: https://github.com/apache/spark/pull/26040#discussion_r333855219
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
 ##
 @@ -41,6 +43,20 @@ private[spark] class BlockStoreShuffleReader[K, C](
 
   private val dep = handle.dependency
 
+  private def fetchContinuousBlocksInBatch: Boolean = {
+val conf = SparkEnv.get.conf
+val compressed = conf.get(config.SHUFFLE_COMPRESS)
+val featureEnabled = 
conf.get(config.SHUFFLE_FETCH_CONTINUOUS_BLOCKS_IN_BATCH)
+val serializerRelocatable = 
dep.serializer.supportsRelocationOfSerializedObjects
+// The batch fetching feature only works for reading consolidate file 
written by
+// SortShuffleWriter or UnsafeShuffleWriter.
+val readConsolidateFile = 
!handle.isInstanceOf[BypassMergeSortShuffleHandle[_, _]]
+
+readConsolidateFile && featureEnabled && endPartition - startPartition > 1 
&&
 
 Review comment:
   How about we add a log here when featureEnabled is true while the final 
decision is false to indicate users that fetchContinuous is not actually used 
due to incompatible configures of serializer or compression codec.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs

2019-10-11 Thread GitBox
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] 
Optimize shuffle fetch of continuous partition IDs
URL: https://github.com/apache/spark/pull/26040#discussion_r333857410
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
 ##
 @@ -35,20 +35,28 @@
   // The length of mapIds must equal to reduceIds.size(), for the i-th mapId 
in mapIds,
   // it corresponds to the i-th int[] in reduceIds, which contains all reduce 
id for this map id.
   public final long[] mapIds;
+  // When batchFetchEnabled=true, reduceIds[i] contains 2 elements: 
startReduceId (inclusive) and
+  // endReduceId (exclusive) for the mapper mapIds[i].
+  // When batchFetchEnabled=false, reduceIds[i] contains all the reduce IDs 
that mapper mapIds[i]
+  // needs to fetch.
   public final int[][] reduceIds;
+  public final boolean batchFetchEnabled;
 
 Review comment:
   Since we are adding this field, I believe the `toString` method should also 
be updated correspondingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs

2019-10-11 Thread GitBox
advancedxy commented on a change in pull request #26040: [SPARK-9853][Core] 
Optimize shuffle fetch of continuous partition IDs
URL: https://github.com/apache/spark/pull/26040#discussion_r333860050
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
 ##
 @@ -53,9 +53,9 @@ public int getSize() {
   /**
* Get index offset for a particular reducer.
*/
-  public ShuffleIndexRecord getIndex(int reduceId) {
-long offset = offsets.get(reduceId);
-long nextOffset = offsets.get(reduceId + 1);
+  public ShuffleIndexRecord getIndex(int startReduceId, int endReduceId) {
 
 Review comment:
   This is a public method, do we need to add a new method instead to achieve 
binary compatibility?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei opened a new pull request #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei opened a new pull request #26086: [SPARK-29302] Make the file name of 
a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086
 
 
   ### What changes were proposed in this pull request?
   Now, for a dynamic partition overwrite operation, the filename of a task 
output is determinable.
   
   So, if speculation is enabled, a task would conflict with its relative 
speculation task.
   In this PR, I make the file name of a task for dynamic partition overwrite 
be unique.
   ### Why are the changes needed?
   
   Data may be corrupted without this PR.
   
   ### Does this PR introduce any user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing UT.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itsvikramagr commented on a change in pull request #24922: [SPARK-28120][SS] Rocksdb state storage implementation

2019-10-11 Thread GitBox
itsvikramagr commented on a change in pull request #24922: [SPARK-28120][SS]  
Rocksdb state storage implementation
URL: https://github.com/apache/spark/pull/24922#discussion_r333870578
 
 

 ##
 File path: sql/core/pom.xml
 ##
 @@ -147,6 +147,12 @@
   mockito-core
   test
 
+
+
+  org.rocksdb
+  rocksdbjni
 
 Review comment:
   @gatorsmile - what are the alternatives if rocksdb is not the best backend. 
Other streaming technologies such as flink and kstreams are using rocksdb as 
primary storage engine. 
   
   With integration in spark codebase, we can probably change the code in any 
way later, but if we take the separate jar route, the kind of extensions you 
can make are limited by the current contract. For example @skonto mentioned one 
of way where we can abstract  state storage implementation to get the best out 
of rocksdb. How can we support such improvement of we take spark package route?
   
   Current implementation based on in memory hashmap is not scalable beyond a 
point. How shall we go about solving it? 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file 
name of a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138
 
 
   But we need  skip send TaskCommitMessage if a task can not commit.
   
   
https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file 
name of a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138
 
 
   But we need  skip send TaskCommitMessage if a task can not commit.
   
   
https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-247


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei commented on issue #26086: [WIP][SPARK-29302] Make the file name of a 
task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138
 
 
   But we need  skip send TaskCommitMessage if a task can not commit.
   
   
https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L246


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file 
name of a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138
 
 
   But we need  skip send `new TaskCommitMessage(addedAbsPathFiles.toMap -> 
partitionPaths.toSet)` if a task can not commit.
   
   
https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file 
name of a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138
 
 
   But we need  skip send 
`https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241`
 if a task can not commit.
   
   
https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [WIP][SPARK-29302] Make the file 
name of a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138
 
 
   But we should  skip send `new TaskCommitMessage(addedAbsPathFiles.toMap -> 
partitionPaths.toSet)` if a task can not commit.
   
   
https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Udbhav30 commented on a change in pull request #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory

2019-10-11 Thread GitBox
Udbhav30 commented on a change in pull request #25398: [SPARK-28659][SQL] Use 
data source if convertible in insert overwrite directory
URL: https://github.com/apache/spark/pull/25398#discussion_r333874882
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
 ##
 @@ -1396,6 +1396,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   compressed = false,
   properties = rowStorage.properties ++ fileStorage.properties)
 
-(ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER))
+val fileFormat = extractFileFormat(fileStorage.serde)
+(ctx.LOCAL != null, storage, Some(fileFormat))
+  }
+
+  private def extractFileFormat(serde: Option[String]): String = {
 
 Review comment:
   @HyukjinKwon, i can use `USING file_format` explicitly that would serve the 
purpose but i thought it is better to fix this and make it inline with `CTAS` 
behavior  which is fixed in this 
[PR](https://github.com/apache/spark/pull/22514)
   
   If you agree to go ahead i can try making changes in analysis/optimizing 
layer instead of parser as suggested by you. 
   Sure i will follow the template and update the PR description. thanks :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei removed a comment on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei removed a comment on issue #26086: [WIP][SPARK-29302] Make the file 
name of a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540960138
 
 
   But we should  skip send `new TaskCommitMessage(addedAbsPathFiles.toMap -> 
partitionPaths.toSet)` if a task can not commit.
   
   
https://github.com/apache/spark/blob/6390f02f9fba059ec5d089a68c8d758aca35c9cd/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L241-L247


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on issue #26086: [WIP][SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei commented on issue #26086: [WIP][SPARK-29302] Make the file name of a 
task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540964132
 
 
   also cc @Clark


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Udbhav30 commented on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory

2019-10-11 Thread GitBox
Udbhav30 commented on issue #25398: [SPARK-28659][SQL] Use data source if 
convertible in insert overwrite directory
URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779
 
 
   > If you only target to fix Hive ser/de to respect compression, why don't 
you set Hive compression properly?
   Yes compression can be achieved by setting Hive ser/de or `USING 
file_format', but as i mentioned this PR is more towards making the behavior 
inline to `CTAS` and co use `datasource` if it is convertible. Let me know if 
you have any suggestions :)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory

2019-10-11 Thread GitBox
Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source 
if convertible in insert overwrite directory
URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779
 
 
   > If you only target to fix Hive ser/de to respect compression, why don't 
you set Hive compression properly?
   
   Yes compression can be achieved by setting Hive ser/de or `USING 
file_format', but as i mentioned this PR is more towards making the behavior 
inline to `CTAS` and co use `datasource` if it is convertible. Let me know if 
you have any suggestions :)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory

2019-10-11 Thread GitBox
Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source 
if convertible in insert overwrite directory
URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779
 
 
   > If you only target to fix Hive ser/de to respect compression, why don't 
you set Hive compression properly?
   
   Yes compression can be achieved by setting Hive ser/de or `USING 
file_format' , but as i mentioned this PR is more towards making the behavior 
inline to `CTAS` and co use `datasource` if it is convertible. Let me know if 
you have any suggestions :)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory

2019-10-11 Thread GitBox
Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source 
if convertible in insert overwrite directory
URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779
 
 
   > If you only target to fix Hive ser/de to respect compression, why don't 
you set Hive compression properly?
   
   Yes compression can be achieved by setting Hive ser/de or `USING 
file_format` , but as i mentioned this PR is more towards making the behavior 
inline to `CTAS` and co use `datasource` if it is convertible. Let me know if 
you have any suggestions :)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory

2019-10-11 Thread GitBox
Udbhav30 edited a comment on issue #25398: [SPARK-28659][SQL] Use data source 
if convertible in insert overwrite directory
URL: https://github.com/apache/spark/pull/25398#issuecomment-540964779
 
 
   > If you only target to fix Hive ser/de to respect compression, why don't 
you set Hive compression properly?
   
   Yes compression can be achieved by setting Hive ser/de or `USING 
file_format` , but as i mentioned this PR is more towards making the behavior 
inline to `CTAS` and to use `datasource` if it is convertible. Let me know if 
you have any suggestions :)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of 
a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540964132
 
 
   cc @viirya  @cloud-fan 
   also cc @Clark


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Udbhav30 commented on a change in pull request #25398: [SPARK-28659][SQL] Use data source if convertible in insert overwrite directory

2019-10-11 Thread GitBox
Udbhav30 commented on a change in pull request #25398: [SPARK-28659][SQL] Use 
data source if convertible in insert overwrite directory
URL: https://github.com/apache/spark/pull/25398#discussion_r333874882
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
 ##
 @@ -1396,6 +1396,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   compressed = false,
   properties = rowStorage.properties ++ fileStorage.properties)
 
-(ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER))
+val fileFormat = extractFileFormat(fileStorage.serde)
+(ctx.LOCAL != null, storage, Some(fileFormat))
+  }
+
+  private def extractFileFormat(serde: Option[String]): String = {
 
 Review comment:
   @HyukjinKwon, i can use `USING file_format` explicitly that would serve the 
purpose but i thought it is better to fix this and make it inline with `CTAS` 
behavior  which is fixed in this 
[PR](https://github.com/apache/spark/pull/22514)
   
   If you agree to go ahead i can try making changes in analysis/optimizing 
layer instead of parser as suggested by you. 
   Sure i will follow the template , I have updated the PR description.  thanks 
:)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya opened a new pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
viirya opened a new pull request #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087
 
 
   
   
   ### What changes were proposed in this pull request?
   
   
   This PR proposes to add groupByRelationKey API to Dataset. It creates 
KeyValueGroupedDataset instance using existing relational columns, instead of a 
typed function in groupByKey API. Because it leverages existing columns, it can 
use existing data partition, if any, when doing operations like cogroup.
   
   ### Why are the changes needed?
   
   
   Currently if users want to do cogroup on DataFrames, there is no good way to 
do except for KeyValueGroupedDataset. KeyValueGroupedDataset ignores existing 
data partition if any. That is a problem.
   
   ```scala
   // df1 and df2 are certainly partitioned and sorted.
   val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c")
 .repartition($"a", $"b").sortWithinPartitions("a", "b")
   val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c")
 .repartition($"a", $"b").sortWithinPartitions("a", "b")
   ```
   ```scala
   // This groupByRelationKey won't unnecessarily repartition the data 
   val df3 = df1.groupByRelationKey("a", "b")
 .cogroup(df2.groupByRelationKey("a", "b")) { case (key, data1, data2) =>
   data1.zip(data2).map { p =>
 p._1.getInt(2) + p._2.getInt(2)
   }
   }
   ```
   
   ```
   == Physical Plan ==
   *(5) SerializeFromObject [input[0, int, false] AS value#11206]
   +- CoGroup 
org.apache.spark.sql.DataFrameSuite$$Lambda$4888/206084072@4601674e, 
createexternalrow(a#11172, b#11173, StructField(a,IntegerType,false), 
StructField(b,IntegerTy
   pe,false)), createexternalrow(a#11172, b#11173, c#11174, 
StructField(a,IntegerType,false), StructField(b,IntegerType,false), 
StructField(c,IntegerType,false)), createexterna
   lrow(a#11188, b#11189, c#11190, StructField(a,IntegerType,false), 
StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [a#11172, 
b#11173], [a#11188, b#11189]
   , [a#11172, b#11173, c#11174], [a#11188, b#11189, c#11190], obj#11205: int
  :- *(2) Sort [a#11172 ASC NULLS FIRST, b#11173 ASC NULLS FIRST], false, 0
  :  +- Exchange hashpartitioning(a#11172, b#11173, 5), false, [id=#10174]
  : +- *(1) Project [_1#11165 AS a#11172, _2#11166 AS b#11173, _3#11167 
AS c#11174]
  :+- *(1) LocalTableScan [_1#11165, _2#11166, _3#11167]
  +- *(4) Sort [a#11188 ASC NULLS FIRST, b#11189 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(a#11188, b#11189, 5), false, [id=#10179]
+- *(3) Project [_1#11181 AS a#11188, _2#11182 AS b#11189, _3#11183 
AS c#11190]
   +- *(3) LocalTableScan [_1#11181, _2#11182, _3#11183]
   ```
   
   
   ```scala
   // Current approach creates additional AppendColumns and repartition data 
again
   df1.groupByKey(r => r.getInt(0)).cogroup(df2.groupByKey(r => r.getInt(0))) { 

 case (key, data1, data2) =>
   data1.zip(data2).map { p =>
 p._1.getInt(2) + p._2.getInt(2)
   }
   }
   ```
   
   ```
   == Physical Plan ==
   *(7) SerializeFromObject [input[0, int, false] AS value#11216]
   +- CoGroup 
org.apache.spark.sql.DataFrameSuite$$Lambda$4892/905560656@19f7e6c5, 
value#11211: int, createexternalrow(a#11172, b#11173, c#11174, 
StructField(a,IntegerType,false), StructField(b,IntegerType,false), 
StructField(c,IntegerType,false)), createexternalrow(a#11188, b#11189, c#11190, 
StructField(a,IntegerType,false), StructField(b,IntegerType,false), 
StructField(c,IntegerType,false)), [value#11211], [value#11213], [a#11172, 
b#11173, c#11174], [a#11188, b#11189, c#11190], obj#11215: int  

  :- *(3) Sort [value#11211 ASC NULLS FIRST], false, 0
  :  +- Exchange hashpartitioning(value#11211, 5), true, [id=#10442]
  : +- AppendColumns 
org.apache.spark.sql.DataFrameSuite$$Lambda$4889/2021090091@6396e053, 
createexternalrow(a#11172, b#11173, c#11174, StructField(a,IntegerType,false), 
StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, 
int, false] AS value#11211] 
   
  :+- *(2) Sort [a#11172 ASC NULLS FIRST, b#11173 ASC NULLS FIRST], 
false, 0
  :   +- Exchange hashpartitioning(a#11172, b#11173, 5), false, 
[id=#10437]
  :  +- *(1) Project [_1#11165 AS a#11172, _2#11166 AS b#11173, 
_3#11167 AS c#11174]
   
  : +- *(1) LocalTableScan [_1#11165, _2#11166, _3#11167]
  +- *(6) Sort [value#11213 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(value#11213, 5), true, [id=#10452]
+- AppendColumns 
org.apache.spark.sql.DataFrameSuite$$Lambda$4891/1736834504@798dbf14, 
createexternalrow(a#11

[GitHub] [spark] Mats-SX commented on a change in pull request #24851: [SPARK-27303][GRAPH] Add Spark Graph API

2019-10-11 Thread GitBox
Mats-SX commented on a change in pull request #24851: [SPARK-27303][GRAPH] Add 
Spark Graph API
URL: https://github.com/apache/spark/pull/24851#discussion_r333893149
 
 

 ##
 File path: 
graph/api/src/main/scala/org/apache/spark/graph/api/RelationshipFrameBuilder.scala
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graph.api
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.DataFrame
+
+/**
+ * Interface used to build a [[RelationshipFrame]].
+ *
+ * @param df DataFrame containing a single relationship in each row
+ * @since 3.0.0
+ */
+final class RelationshipFrameBuilder(val df: DataFrame) {
 
 Review comment:
   Well, almost ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Mats-SX commented on a change in pull request #24851: [SPARK-27303][GRAPH] Add Spark Graph API

2019-10-11 Thread GitBox
Mats-SX commented on a change in pull request #24851: [SPARK-27303][GRAPH] Add 
Spark Graph API
URL: https://github.com/apache/spark/pull/24851#discussion_r333893149
 
 

 ##
 File path: 
graph/api/src/main/scala/org/apache/spark/graph/api/RelationshipFrameBuilder.scala
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graph.api
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.DataFrame
+
+/**
+ * Interface used to build a [[RelationshipFrame]].
+ *
+ * @param df DataFrame containing a single relationship in each row
+ * @since 3.0.0
+ */
+final class RelationshipFrameBuilder(val df: DataFrame) {
 
 Review comment:
   Well, almost ;) but I get it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei commented on issue #26086: [SPARK-29302] Make the file name of a task 
for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545
 
 
   Oh, It seems that this issue is related with 
https://github.com/apache/spark/pull/24142.
   I just read the comments of that, it seems that there are some risk that 
cause duplicate result.
   
   @ajithme  @vanzin  @LantaoJin 
   
   How about  using this method to name a task file only when 
dynamicPartitionOverwrite is true?
   Because for dynamic partition overwrite, it will keep a partition set, which 
would not cause duplicate result.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] merrily01 opened a new pull request #26088: [SPARK-29436][K8S] Support executor for selecting scheduler through scheduler name in the case of k8s multi-scheduler scenario

2019-10-11 Thread GitBox
merrily01 opened a new pull request #26088: [SPARK-29436][K8S] Support executor 
for selecting scheduler through scheduler name in the case of k8s 
multi-scheduler scenario
URL: https://github.com/apache/spark/pull/26088
 
 
   ### What changes were proposed in this pull request?
   
   Support executor for selecting scheduler through scheduler name in the case 
of k8s multi-scheduler scenario.
   
   ### Why are the changes needed?
   
   If there is no such function, spark can not support the case of k8s 
multi-scheduler scenario.
   
   ### Does this PR introduce any user-facing change?
   
   Yes, users can add scheduler name through configuration.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yaooqinn commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin

2019-10-11 Thread GitBox
yaooqinn commented on a change in pull request #25977: 
[SPARK-29268][SQL]isolationOn value is wrong in case of 
spark.sql.hive.metastore.jars != builtin
URL: https://github.com/apache/spark/pull/25977#discussion_r333897470
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
 ##
 @@ -414,7 +415,7 @@ private[spark] object HiveUtils extends Logging {
 hadoopConf = hadoopConf,
 execJars = jars.toSeq,
 config = configurations,
-isolationOn = true,
+isolationOn = !isCliSessionState(),
 
 Review comment:
   IMO,  SparkSQLCLIDriver already load and initiate hive classes in main 
classloader, then we shouldn't use !builtin hive jars. Maybe just throw 
exception when using spark-sql stript with spark.sql.hive.metastore.jars


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yaooqinn commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin

2019-10-11 Thread GitBox
yaooqinn commented on a change in pull request #25977: 
[SPARK-29268][SQL]isolationOn value is wrong in case of 
spark.sql.hive.metastore.jars != builtin
URL: https://github.com/apache/spark/pull/25977#discussion_r333897470
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
 ##
 @@ -414,7 +415,7 @@ private[spark] object HiveUtils extends Logging {
 hadoopConf = hadoopConf,
 execJars = jars.toSeq,
 config = configurations,
-isolationOn = true,
+isolationOn = !isCliSessionState(),
 
 Review comment:
   IMO,  SparkSQLCLIDriver already load and initiate hive classes in the main 
classloader, then we shouldn't use !builtin hive jars. Maybe just throw 
exception when using spark-sql stript with spark.sql.hive.metastore.jars


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333900101
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
+  logWarning("OptimizeLocalShuffleReader rule is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(optimizedPlan)
 
 Review comment:
   We don't need to revert, just return the original plan `plan`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of 
a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545
 
 
   Oh, It seems that this issue is related with 
https://github.com/apache/spark/pull/24142.
   I just read the comments of that, it seems that there are some risk that 
cause duplicate result.
   
   @ajithme  @vanzin  @LantaoJin 
   
   How about  using this method to name a task file for the cases below?
   
   1. newTaskTempFile and dynamicPartitionOverwrite is true
   2. newTaskTempFileAbsPath
   
   Because for these cases, it will keep a filesToMove set, which would not 
cause duplicate result.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333901952
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
 
 Review comment:
   It's very likely that we have exchanges in the query plan, what we need to 
guarantee here is we don't introduce extra exchanges. We should do:
   ```
   def numExchanges...
   val numExchangesBefore = numExchanges(EnsureRequirements(conf).apply(plan))
   val numExchangesAfter = 
numExchanges(EnsureRequirements(conf).apply(optimizedPlan))
   if (numExchangesAfter > numExchangesBefore) {
 plan
   } else {
 optimizedPlan
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333916849
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
 ##
 @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   Literal.create(null, IntegerType), Literal.create(null, IntegerType)), 
null)
   }
 
+  test("ToNumber") {
 
 Review comment:
   @MaxGekk Thanks for this review. I test the locale on Postgresql but 'L' 
seems not works.
   ```
   select to_number('USD34234.4350', 'L9.');  // 34234.435
   select to_number('EUR34234.4350', 'L9.');  // 34234.435
   select to_number('RY34234.4350', 'L9.');  // 34234.435
   ```
   Although 'RY ' is not a valid locale , the result is the same as the others .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333917844
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
 ##
 @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   Literal.create(null, IntegerType), Literal.create(null, IntegerType)), 
null)
   }
 
+  test("ToNumber") {
 
 Review comment:
   I think the description of locales is not consistent with the behavior in 
fact.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333917844
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
 ##
 @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   Literal.create(null, IntegerType), Literal.create(null, IntegerType)), 
null)
   }
 
+  test("ToNumber") {
 
 Review comment:
   I think the description of locales is not consistent with the behavior in 
fact. Maybe I should remove the comment for locale.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skonto commented on issue #25870: [SPARK-27936][K8S] support python deps

2019-10-11 Thread GitBox
skonto commented on issue #25870: [SPARK-27936][K8S] support python deps
URL: https://github.com/apache/spark/pull/25870#issuecomment-541001902
 
 
   @holdenk this is because spark-submit add the resource in sparks.jars:
   ```
   19/10/11 13:01:30 WARN Utils: Your hostname, universe resolves to a loopback 
address: 127.0.1.1; using 192.168.2.4 instead (on interface wlp2s0)
   19/10/11 13:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
another address
   Parsed arguments:
 master  k8s://https://10.96.0.1:443
 deployMode  cluster
 executorMemory  1G
 executorCores   null
 totalExecutorCores  null
 propertiesFile  null
 driverMemory1G
 driverCores null
 driverExtraClassPathnull
 driverExtraLibraryPath  null
 driverExtraJavaOptions  null
 supervise   false
 queue   null
 numExecutors2
 files   null
 pyFiles null
 archivesnull
 mainClass   org.apache.spark.examples.SparkPi
 primaryResource 
local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
 namespark-pi
 childArgs   [100]
 jarsnull
 packagesnull
 packagesExclusions  null
 repositoriesnull
 verbose true
   
   Spark properties used, including those specified through
--conf and those from the properties file null:
 (spark.kubernetes.driver.pod.name,spark-pi-driver)
 (spark.executor.instances,2)
 (spark.driver.memory,1G)
 (spark.executor.memory,1G)
 (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa)
 (spark.kubernetes.namespace,spark)
 
(spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12)
 (spark.kubernetes.container.image.pullPolicy,Always)
   
   
   19/10/11 13:01:31 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   Main class:
   org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
   Arguments:
   --primary-java-resource
   
local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
   --main-class
   org.apache.spark.examples.SparkPi
   --arg
   100
   Spark config:
   (spark.kubernetes.namespace,spark)
   
(spark.jars,local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar)
   (spark.app.name,spark-pi)
   (spark.driver.memory,1G)
   (spark.executor.instances,2)
   (spark.submit.pyFiles,)
   (spark.kubernetes.container.image.pullPolicy,Always)
   
(spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12)
   (spark.submit.deployMode,cluster)
   (spark.master,k8s://https://10.96.0.1:443)
   (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa)
   (spark.executor.memory,1G)
   (spark.kubernetes.driver.pod.name,spark-pi-driver)
   Classpath elements:
   ```
   So since the PR [here](https://github.com/apache/spark/pull/23546) manages 
that property we dont need to do double work. For python its different.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps

2019-10-11 Thread GitBox
skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps
URL: https://github.com/apache/spark/pull/25870#issuecomment-541001902
 
 
   @holdenk this is because spark-submit adds the resource to the `sparks.jars` 
property by default,
   check bellow:
   ```
   19/10/11 13:01:30 WARN Utils: Your hostname, universe resolves to a loopback 
address: 127.0.1.1; using 192.168.2.4 instead (on interface wlp2s0)
   19/10/11 13:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
another address
   Parsed arguments:
 master  k8s://https://10.96.0.1:443
 deployMode  cluster
 executorMemory  1G
 executorCores   null
 totalExecutorCores  null
 propertiesFile  null
 driverMemory1G
 driverCores null
 driverExtraClassPathnull
 driverExtraLibraryPath  null
 driverExtraJavaOptions  null
 supervise   false
 queue   null
 numExecutors2
 files   null
 pyFiles null
 archivesnull
 mainClass   org.apache.spark.examples.SparkPi
 primaryResource 
local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
 namespark-pi
 childArgs   [100]
 jarsnull
 packagesnull
 packagesExclusions  null
 repositoriesnull
 verbose true
   
   Spark properties used, including those specified through
--conf and those from the properties file null:
 (spark.kubernetes.driver.pod.name,spark-pi-driver)
 (spark.executor.instances,2)
 (spark.driver.memory,1G)
 (spark.executor.memory,1G)
 (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa)
 (spark.kubernetes.namespace,spark)
 
(spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12)
 (spark.kubernetes.container.image.pullPolicy,Always)
   
   
   19/10/11 13:01:31 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   Main class:
   org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
   Arguments:
   --primary-java-resource
   
local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
   --main-class
   org.apache.spark.examples.SparkPi
   --arg
   100
   Spark config:
   (spark.kubernetes.namespace,spark)
   
(spark.jars,local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar)
   (spark.app.name,spark-pi)
   (spark.driver.memory,1G)
   (spark.executor.instances,2)
   (spark.submit.pyFiles,)
   (spark.kubernetes.container.image.pullPolicy,Always)
   
(spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12)
   (spark.submit.deployMode,cluster)
   (spark.master,k8s://https://10.96.0.1:443)
   (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa)
   (spark.executor.memory,1G)
   (spark.kubernetes.driver.pod.name,spark-pi-driver)
   Classpath elements:
   ```
   So since the PR [here](https://github.com/apache/spark/pull/23546) manages 
that property we dont need to do double work. For python its different.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps

2019-10-11 Thread GitBox
skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps
URL: https://github.com/apache/spark/pull/25870#issuecomment-541001902
 
 
   @holdenk this is because spark-submit adds the resource to the `sparks.jars` 
property by default,
   check bellow:
   ```
   19/10/11 13:01:30 WARN Utils: Your hostname, universe resolves to a loopback 
address: 127.0.1.1; using 192.168.2.4 instead (on interface wlp2s0)
   19/10/11 13:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
another address
   Parsed arguments:
 master  k8s://https://10.96.0.1:443
 deployMode  cluster
 executorMemory  1G
 executorCores   null
 totalExecutorCores  null
 propertiesFile  null
 driverMemory1G
 driverCores null
 driverExtraClassPathnull
 driverExtraLibraryPath  null
 driverExtraJavaOptions  null
 supervise   false
 queue   null
 numExecutors2
 files   null
 pyFiles null
 archivesnull
 mainClass   org.apache.spark.examples.SparkPi
 primaryResource 
local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
 namespark-pi
 childArgs   [100]
 jarsnull
 packagesnull
 packagesExclusions  null
 repositoriesnull
 verbose true
   
   Spark properties used, including those specified through
--conf and those from the properties file null:
 (spark.kubernetes.driver.pod.name,spark-pi-driver)
 (spark.executor.instances,2)
 (spark.driver.memory,1G)
 (spark.executor.memory,1G)
 (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa)
 (spark.kubernetes.namespace,spark)
 
(spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12)
 (spark.kubernetes.container.image.pullPolicy,Always)
   
   
   19/10/11 13:01:31 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   Main class:
   org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
   Arguments:
   --primary-java-resource
   
local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
   --main-class
   org.apache.spark.examples.SparkPi
   --arg
   100
   Spark config:
   (spark.kubernetes.namespace,spark)
   
(spark.jars,local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar)
   (spark.app.name,spark-pi)
   (spark.driver.memory,1G)
   (spark.executor.instances,2)
   (spark.submit.pyFiles,)
   (spark.kubernetes.container.image.pullPolicy,Always)
   
(spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12)
   (spark.submit.deployMode,cluster)
   (spark.master,k8s://https://10.96.0.1:443)
   (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa)
   (spark.executor.memory,1G)
   (spark.kubernetes.driver.pod.name,spark-pi-driver)
   Classpath elements:
   ```
   So since the PR [here](https://github.com/apache/spark/pull/23546) uploads 
whatever is needed from `spark.jars` we dont need to do double work. For python 
its different.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps

2019-10-11 Thread GitBox
skonto edited a comment on issue #25870: [SPARK-27936][K8S] support python deps
URL: https://github.com/apache/spark/pull/25870#issuecomment-541001902
 
 
   @holdenk this is because spark-submit adds the resource to the `sparks.jars` 
property by default,
   check bellow:
   ```
   19/10/11 13:01:30 WARN Utils: Your hostname, universe resolves to a loopback 
address: 127.0.1.1; using 192.168.2.4 instead (on interface wlp2s0)
   19/10/11 13:01:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to 
another address
   Parsed arguments:
 master  k8s://https://10.96.0.1:443
 deployMode  cluster
 executorMemory  1G
 executorCores   null
 totalExecutorCores  null
 propertiesFile  null
 driverMemory1G
 driverCores null
 driverExtraClassPathnull
 driverExtraLibraryPath  null
 driverExtraJavaOptions  null
 supervise   false
 queue   null
 numExecutors2
 files   null
 pyFiles null
 archivesnull
 mainClass   org.apache.spark.examples.SparkPi
 primaryResource 
local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
 namespark-pi
 childArgs   [100]
 jarsnull
 packagesnull
 packagesExclusions  null
 repositoriesnull
 verbose true
   
   Spark properties used, including those specified through
--conf and those from the properties file null:
 (spark.kubernetes.driver.pod.name,spark-pi-driver)
 (spark.executor.instances,2)
 (spark.driver.memory,1G)
 (spark.executor.memory,1G)
 (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa)
 (spark.kubernetes.namespace,spark)
 
(spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12)
 (spark.kubernetes.container.image.pullPolicy,Always)
   
   
   19/10/11 13:01:31 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
   Main class:
   org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
   Arguments:
   --primary-java-resource
   
local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar
   --main-class
   org.apache.spark.examples.SparkPi
   --arg
   100
   Spark config:
   (spark.kubernetes.namespace,spark)
   
(spark.jars,local:///opt/spark/examples/jars/spark-examples_2.12-2.1.2-2.4.4-lightbend.jar)
   (spark.app.name,spark-pi)
   (spark.driver.memory,1G)
   (spark.executor.instances,2)
   (spark.submit.pyFiles,)
   (spark.kubernetes.container.image.pullPolicy,Always)
   
(spark.kubernetes.container.image,lightbend/spark:2.1.2-OpenShift-2.4.4-rh-2.12)
   (spark.submit.deployMode,cluster)
   (spark.master,k8s://https://10.96.0.1:443)
   (spark.kubernetes.authenticate.driver.serviceAccountName,spark-sa)
   (spark.executor.memory,1G)
   (spark.kubernetes.driver.pod.name,spark-pi-driver)
   Classpath elements:
   ```
   So since the PR [here](https://github.com/apache/spark/pull/23546) uploads 
whatever is needed from `spark.jars` we dont need to do double work. For python 
its different as technically the main resource is not a jar.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of 
a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545
 
 
   Oh, It seems that this issue is related with 
https://github.com/apache/spark/pull/24142.
   I just read the comments of that, it seems that there are some risk that 
cause duplicate result.
   
   @ajithme  @vanzin  @LantaoJin 
   
   How about  using this method to name a task file for the cases below?
   
   1. newTaskTempFile and dynamicPartitionOverwrite is true
   2. newTaskTempFileAbsPath
   
   Because for these cases, it will keep a filesToMove set, which would not 
cause duplicate result.
   
   
   P.S:
   I doubt whether it would cause duplicate result for non dynamic partition 
overwrite case.
   
   Should outputCommitCoordinator decide which task can commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333926303
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
+  logWarning("OptimizeLocalShuffleReader rule is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(optimizedPlan)
+} else {
+  optimizedPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends LeafExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
+
+def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = {
+  val initialPartitioning = stage.plan.asInstanceOf[ShuffleExchangeExec]
 
 Review comment:
   `.asInstanceOf[ShuffleExchangeExec]` is not necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-u

[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333926473
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
+  logWarning("OptimizeLocalShuffleReader rule is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(optimizedPlan)
+} else {
+  optimizedPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends LeafExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
+
+def canUseChildPartitioning(stage: ShuffleQueryStageExec): Partitioning = {
 
 Review comment:
   The method name sounds like it returns a boolean. How about 
`tryReserveChildPartitioning`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additiona

[GitHub] [spark] sandeep-katta commented on a change in pull request #25977: [SPARK-29268][SQL]isolationOn value is wrong in case of spark.sql.hive.metastore.jars != builtin

2019-10-11 Thread GitBox
sandeep-katta commented on a change in pull request #25977: 
[SPARK-29268][SQL]isolationOn value is wrong in case of 
spark.sql.hive.metastore.jars != builtin
URL: https://github.com/apache/spark/pull/25977#discussion_r333926916
 
 

 ##
 File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
 ##
 @@ -414,7 +415,7 @@ private[spark] object HiveUtils extends Logging {
 hadoopConf = hadoopConf,
 execJars = jars.toSeq,
 config = configurations,
-isolationOn = true,
+isolationOn = !isCliSessionState(),
 
 Review comment:
   there are 2 approach
   
   1.based on `isCliSessionState`  Hive class should be loaded ( This PR fix)
   2. Throw exception at the start of spark-sql if user used !builtin. By doing 
this there might be a problem, because currently this problem exists for Derby, 
if the user uses some other Database there will be a impact


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333927414
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 ##
 @@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight}
+import org.apache.spark.sql.internal.SQLConf
+
+case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
+
+  def canUseOrRevertLocalShuffleReaderLeft(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildRight &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) ||
+  (join.buildSide == BuildRight && 
join.left.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def canUseOrRevertLocalShuffleReaderRight(join: BroadcastHashJoinExec): 
Boolean = {
+(join.buildSide == BuildLeft &&  
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) ||
+  (join.buildSide == BuildLeft && 
join.right.isInstanceOf[LocalShuffleReaderExec])
+  }
+
+  def revertLocalShuffleReader(plan: SparkPlan): SparkPlan = {
+plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+join.copy(right = 
join.right.asInstanceOf[LocalShuffleReaderExec].child)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+join.copy(left = join.left.asInstanceOf[LocalShuffleReaderExec].child)
+}
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.optimizedLocalShuffleReaderEnabled) {
+  return plan
+}
+
+val optimizedPlan = plan.transformDown {
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderRight(join) =>
+val localReader = 
LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
+join.copy(right = localReader)
+  case join: BroadcastHashJoinExec if 
canUseOrRevertLocalShuffleReaderLeft(join) =>
+val localReader = 
LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
+join.copy(left = localReader)
+}
+
+val afterEnsureRequirements = EnsureRequirements(conf).apply(optimizedPlan)
+
+val numExchanges = afterEnsureRequirements.collect {
+  case e: ShuffleExchangeExec => e
+}.length
+
+if (numExchanges > 0) {
+  logWarning("OptimizeLocalShuffleReader rule is not applied due" +
+" to additional shuffles will be introduced.")
+  revertLocalShuffleReader(optimizedPlan)
+} else {
+  optimizedPlan
+}
+  }
+}
+
+case class LocalShuffleReaderExec(
+child: QueryStageExec) extends LeafExecNode {
 
 Review comment:
   nit: we can put it in the previous line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum opened a new pull request #26089: [SPARK-29423][SQL] lazily initialize StreamingQueryManager in SessionState

2019-10-11 Thread GitBox
wangyum opened a new pull request #26089: [SPARK-29423][SQL] lazily initialize 
StreamingQueryManager in SessionState
URL: https://github.com/apache/spark/pull/26089
 
 
   ### What changes were proposed in this pull request?
   
   This PR makes `SessionState` lazily initialize `StreamingQueryManager` to 
avoid constrcting  `StreamingQueryManager` for each session when connecting to 
ThriftServer.
   
   ### Why are the changes needed?
   
   Reduce memory usage.
   
   
   ### Does this PR introduce any user-facing change?
   
   No.
   
   
   ### How was this patch tested?
   manual test
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Clarkkkkk opened a new pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution

2019-10-11 Thread GitBox
Clark opened a new pull request #26090: [SPARK-29302]Fix writing file 
collision in dynamic partition overwrite mode within speculative execution
URL: https://github.com/apache/spark/pull/26090
 
 
   ### What changes were proposed in this pull request?
   When inserting into a partitioned DataSource table (would not reproduced if 
using a Hive table) with dynamic partition overwrite and speculative execution, 
attempts of same task will try to write same files.
   
   This PR reuse FileOutputCommitter to avoid write collision, and rename files 
in staging directory to final output directory using the original logic in 
HadoopMapReduceCommitProtocol#commitJob.
   
   
   ### Why are the changes needed?
   Task failed is this circumstance.
   
   
   ### Does this PR introduce any user-facing change?
   No.
   
   
   ### How was this patch tested?
   This patch is tested by existing tests in 
org.apache.spark.sql.sources.InsertSuite.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333932293
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 ##
 @@ -125,6 +130,10 @@ class AdaptiveQueryExecSuite
   assert(smj.size == 1)
   val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
   assert(bhj.size == 1)
+  val localReaders = adaptivePlan.collect {
+case reader: LocalShuffleReaderExec => reader
+  }
+  assert(localReaders.length === 1)
 
 Review comment:
   we can add a method
   ```
   def checkNumLocalShuffleReaders(plan: SparkPlan, expected: Int): Unit = ...
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333934690
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+
+/**
+ * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition
+ * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle 
partitions
+ * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, 
inclusive).
+ */
+private final class LocalShuffleRowRDDPartition(
+val preShufflePartitionIndex: Int,
+val startPostShufflePartitionIndex: Int,
+val endPostShufflePartitionIndex: Int) extends Partition {
+  override val index: Int = preShufflePartitionIndex
+}
+
+/**
+ * This is a specialized version of 
[[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used
+ * in Spark SQL adaptive execution when a shuffle join is converted to 
broadcast join at runtime
+ * because the map output of one input table is small enough for broadcast. 
This RDD represents the
+ * data of another input table of the join that reads from shuffle. Each 
partition of the RDD reads
+ * the whole data from just one mapper output locally. So actually there is no 
data transferred
+ * from the network.
+
+ * This RDD takes a [[ShuffleDependency]] (`dependency`).
+ *
+ * The `dependency` has the parent RDD of this RDD, which represents the 
dataset before shuffle
+ * (i.e. map output). Elements of this RDD are (partitionId, Row) pairs.
+ * Partition ids should be in the range [0, numPartitions - 1].
+ * `dependency.partitioner.numPartitions` is the number of pre-shuffle 
partitions. (i.e. the number
+ * of partitions of the map output). The post-shuffle partition number is the 
same to the parent
+ * RDD's partition number.
+ */
+class LocalShuffledRowRDD(
+ var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
+ metrics: Map[String, SQLMetric])
+  extends RDD[InternalRow](dependency.rdd.context, Nil) {
+
+  private[this] val numReducers = dependency.partitioner.numPartitions
+  private[this] val numMappers = dependency.rdd.partitions.length
+
+  override def getDependencies: Seq[Dependency[_]] = List(dependency)
+
+  override def getPartitions: Array[Partition] = {
+
+Array.tabulate[Partition](numMappers) { i =>
+  new LocalShuffleRowRDDPartition(i, 0, numReducers)
+}
+  }
+
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+val tracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
 
 Review comment:
   why not just call `dependency`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333934898
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+
+/**
+ * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition
+ * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle 
partitions
+ * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, 
inclusive).
+ */
+private final class LocalShuffleRowRDDPartition(
 
 Review comment:
   `ShuffleRow` -> `ShuffledRow`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333935255
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
+
+
+/**
+ * The [[Partition]] used by [[LocalShuffledRowRDD]]. A pre-shuffle partition
+ * (identified by `preShufflePartitionIndex`) contains a range of post-shuffle 
partitions
+ * (`startPostShufflePartitionIndex` to `endPostShufflePartitionIndex - 1`, 
inclusive).
+ */
+private final class LocalShuffleRowRDDPartition(
+val preShufflePartitionIndex: Int,
+val startPostShufflePartitionIndex: Int,
 
 Review comment:
   this is not used any where.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server

2019-10-11 Thread GitBox
turboFei commented on a change in pull request #25797: [SPARK-29043][Core] 
Improve the concurrent performance of History Server
URL: https://github.com/apache/spark/pull/25797#discussion_r333935746
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -561,7 +555,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 .last(newLastScanTime - 1)
 .asScala
 .toList
-  stale.foreach { log =>
+  stale.filterNot(isProcessing(_)).foreach { log =>
 
 Review comment:
   Sorry for late reply, I was on my National Day holiday for past several days.
   
   > Instead of cleaning that log, it will instead be skipped until the next 
time the cleaner task run, which may be quote a long time later.
   
   >  but we need to also retry cleaning up these logs sooner, at least sooner 
than the interval of cleaner.
   
   It sound a little complex, should we clean these logs with half of  interval 
of cleaner later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf commented on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf commented on issue #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#issuecomment-541016870
 
 
   Thank you for creating a MR so fast after my submitting the jira ticket! 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server

2019-10-11 Thread GitBox
turboFei commented on a change in pull request #25797: [SPARK-29043][Core] 
Improve the concurrent performance of History Server
URL: https://github.com/apache/spark/pull/25797#discussion_r333935746
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -561,7 +555,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 .last(newLastScanTime - 1)
 .asScala
 .toList
-  stale.foreach { log =>
+  stale.filterNot(isProcessing(_)).foreach { log =>
 
 Review comment:
   Sorry for late reply, I was on my National Day holiday for past several days.
   
   > Instead of cleaning that log, it will instead be skipped until the next 
time the cleaner task run, which may be quote a long time later.
   
   >  but we need to also retry cleaning up these logs sooner, at least sooner 
than the interval of cleaner.
   
   It sound a little complex, should we clean these logs with half of  interval 
of cleaner later?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #25797: [SPARK-29043][Core] Improve the concurrent performance of History Server

2019-10-11 Thread GitBox
turboFei commented on a change in pull request #25797: [SPARK-29043][Core] 
Improve the concurrent performance of History Server
URL: https://github.com/apache/spark/pull/25797#discussion_r333935746
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -561,7 +555,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 .last(newLastScanTime - 1)
 .asScala
 .toList
-  stale.foreach { log =>
+  stale.filterNot(isProcessing(_)).foreach { log =>
 
 Review comment:
   Sorry for late reply, I was on my National Day holiday for past several days.
   
   > Instead of cleaning that log, it will instead be skipped until the next 
time the cleaner task run, which may be quote a long time later.
   
   >  but we need to also retry cleaning up these logs sooner, at least sooner 
than the interval of cleaner.
   
   It sound a little complex, should we clean these logs with half  interval of 
cleaner later?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333938629
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ##
 @@ -2267,6 +2267,215 @@ case class FormatNumber(x: Expression, d: Expression)
   override def prettyName: String = "format_number"
 }
 
+/**
+ * A function that converts string to numeric.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+_FUNC_(strExpr, patternExpr) - Convert a string to a number based on the 
pattern.
+The pattern can consist of the following characters:
+  '9': digit position (can be dropped if insignificant)
+  '0': digit position (will not be dropped, even if insignificant)
+  '.': decimal point (only allowed once)
+  ',': group (thousands) separator
+  'S': sign anchored to number (uses locale)
+  'L': currency symbol (uses locale)
+  'D': decimal point (uses locale)
+  'G': group separator (uses locale)
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_('4540', '999');
+   454
+  > SELECT _FUNC_('454.00', '000D00');
+   454
+  > SELECT _FUNC_('12,454.8-', '99G999D9S');
+   -12454.8
+  > SELECT _FUNC_('CNY234234.4350', 'L99.');
+   234234.435
+  """)
+// scalastyle:on line.size.limit
+case class ToNumber(strExpr: Expression, patternExpr: Expression)
+  extends BinaryExpression with ImplicitCastInputTypes {
+
+  // scalastyle:off caselocale
+  private lazy val patternStr = 
patternExpr.eval().asInstanceOf[UTF8String].toUpperCase.toString
 
 Review comment:
   OK. A good suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#discussion_r333938918
 
 

 ##
 File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
 ##
 @@ -2203,4 +2203,20 @@ class DataFrameSuite extends QueryTest with 
SharedSparkSession {
   |*(1) Range (0, 10, step=1, splits=2)""".stripMargin))
 }
   }
+
+  test("groupByRelationKey") {
 
 Review comment:
   Tests should be updated according to add typed datasets as well if the above 
changes are implemented


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#discussion_r333936540
 
 

 ##
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##
 @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql](
   withGroupingKey.newColumns)
   }
 
+  /**
+   * Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row 
of given
+   * columns.
+   *
+   * @group typedrel
+   * @since 3.0.0
+   */
+  @scala.annotation.varargs
+  def groupByRelationKey(
+  col1: String,
+  cols: String*): KeyValueGroupedDataset[Row, Row] = {
+val colNames: Seq[String] = col1 +: cols
+val keyAttrs = colNames.map(colName => resolve(colName).toAttribute)
+val keySchema = StructType.fromAttributes(keyAttrs)
+val keyEncoder = RowEncoder(keySchema)
+val valEncoder = RowEncoder(schema)
 
 Review comment:
   I don't see why we couldn't keep the encoder that is already present in the 
Dataset. With this approach we lose that type information. I'd prefer doing 
this:
   ```
   def groupByRelationKey(
 col1: String,
 cols: String*): KeyValueGroupedDataset[Row, T] = {
   val colNames: Seq[String] = col1 +: cols
   val keyAttrs = colNames.map(colName => resolve(colName).toAttribute)
   val keySchema = StructType.fromAttributes(keyAttrs)
   val keyEncoder = RowEncoder(keySchema)
   new KeyValueGroupedDataset(
 keyEncoder,
 encoder,
 queryExecution,
 logicalPlan.output,
 keyAttrs)
 }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#discussion_r333938397
 
 

 ##
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##
 @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql](
   withGroupingKey.newColumns)
   }
 
+  /**
+   * Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row 
of given
+   * columns.
+   *
+   * @group typedrel
+   * @since 3.0.0
+   */
+  @scala.annotation.varargs
+  def groupByRelationKey(
+  col1: String,
+  cols: String*): KeyValueGroupedDataset[Row, Row] = {
+val colNames: Seq[String] = col1 +: cols
+val keyAttrs = colNames.map(colName => resolve(colName).toAttribute)
+val keySchema = StructType.fromAttributes(keyAttrs)
+val keyEncoder = RowEncoder(keySchema)
+val valEncoder = RowEncoder(schema)
 
 Review comment:
   Then we can then leverage the `keyAs[L : Encoder]` method on 
`KeyValueGroupedDataset` to get full type information so that entirely typed  
`cogroup` would be possible (which is arguably the more userfriendly approach).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#discussion_r333936540
 
 

 ##
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##
 @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql](
   withGroupingKey.newColumns)
   }
 
+  /**
+   * Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row 
of given
+   * columns.
+   *
+   * @group typedrel
+   * @since 3.0.0
+   */
+  @scala.annotation.varargs
+  def groupByRelationKey(
+  col1: String,
+  cols: String*): KeyValueGroupedDataset[Row, Row] = {
+val colNames: Seq[String] = col1 +: cols
+val keyAttrs = colNames.map(colName => resolve(colName).toAttribute)
+val keySchema = StructType.fromAttributes(keyAttrs)
+val keyEncoder = RowEncoder(keySchema)
+val valEncoder = RowEncoder(schema)
 
 Review comment:
   I don't see why we couldn't keep the encoder that is already present in the 
Dataset. With the current approach we lose that type information. I'd prefer 
doing this:
   ```
   def groupByRelationKey(
 col1: String,
 cols: String*): KeyValueGroupedDataset[Row, T] = {
   val colNames: Seq[String] = col1 +: cols
   val keyAttrs = colNames.map(colName => resolve(colName).toAttribute)
   val keySchema = StructType.fromAttributes(keyAttrs)
   val keyEncoder = RowEncoder(keySchema)
   new KeyValueGroupedDataset(
 keyEncoder,
 encoder,
 queryExecution,
 logicalPlan.output,
 keyAttrs)
 }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333939935
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -388,6 +388,14 @@ object SQLConf {
 "must be a positive integer.")
   .createOptional
 
+  val OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED =
 
 Review comment:
   `OPTIMIZED` -> `OPTIMIZE`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333940060
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -2137,6 +2145,8 @@ class SQLConf extends Serializable with Logging {
   def maxNumPostShufflePartitions: Int =
 
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
 
+  def optimizedLocalShuffleReaderEnabled: Boolean = 
getConf(OPTIMIZED_LOCAL_SHUFFLE_READER_ENABLED)
 
 Review comment:
   It's only called once, we don't need to create a method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#discussion_r333938918
 
 

 ##
 File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
 ##
 @@ -2203,4 +2203,20 @@ class DataFrameSuite extends QueryTest with 
SharedSparkSession {
   |*(1) Range (0, 10, step=1, splits=2)""".stripMargin))
 }
   }
+
+  test("groupByRelationKey") {
 
 Review comment:
   Tests should be updated according to add typed datasets as well if my  
previous change suggestion is implemented


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333940347
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ##
 @@ -2267,6 +2267,215 @@ case class FormatNumber(x: Expression, d: Expression)
   override def prettyName: String = "format_number"
 }
 
+/**
+ * A function that converts string to numeric.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+_FUNC_(strExpr, patternExpr) - Convert a string to a number based on the 
pattern.
+The pattern can consist of the following characters:
+  '9': digit position (can be dropped if insignificant)
+  '0': digit position (will not be dropped, even if insignificant)
+  '.': decimal point (only allowed once)
+  ',': group (thousands) separator
+  'S': sign anchored to number (uses locale)
+  'L': currency symbol (uses locale)
+  'D': decimal point (uses locale)
+  'G': group separator (uses locale)
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_('4540', '999');
+   454
+  > SELECT _FUNC_('454.00', '000D00');
+   454
+  > SELECT _FUNC_('12,454.8-', '99G999D9S');
+   -12454.8
+  > SELECT _FUNC_('CNY234234.4350', 'L99.');
+   234234.435
+  """)
+// scalastyle:on line.size.limit
+case class ToNumber(strExpr: Expression, patternExpr: Expression)
+  extends BinaryExpression with ImplicitCastInputTypes {
+
+  // scalastyle:off caselocale
+  private lazy val patternStr = 
patternExpr.eval().asInstanceOf[UTF8String].toUpperCase.toString
+  // scalastyle:on caselocale
+
+  override def left: Expression = strExpr
+  override def right: Expression = patternExpr
+  override def dataType: DataType = StringType
+  override def inputTypes: Seq[DataType] = Seq(StringType, StringType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+def checkDecimalPointNum(c: Char): Boolean = {
+  c == '.' || c.toUpper == 'D'
+}
+
+val inputTypeCheck = super.checkInputDataTypes()
+if(inputTypeCheck.isSuccess) {
+  if (patternStr.count(checkDecimalPointNum(_)) > 1) {
 
 Review comment:
   OK. I learned it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333940553
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
 ##
 @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager {
   context: TaskContext,
   metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
 
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive) to
+   * read from mapId.
+   * Called on executors by reduce tasks.
+   */
+  def getMapReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
 
 Review comment:
   now the local shuffle reader always read all the shuffle blocks of one 
mapper. Let's add the above 2 parameters later when we going to use them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333940553
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
 ##
 @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager {
   context: TaskContext,
   metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
 
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive) to
+   * read from mapId.
+   * Called on executors by reduce tasks.
+   */
+  def getMapReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
 
 Review comment:
   now the local shuffle reader always read all the shuffle blocks of one 
mapper. Let's add the above 2 parameters later when we going to use them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333941162
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
 ##
 @@ -36,19 +36,35 @@ private[spark] class BlockStoreShuffleReader[K, C](
 readMetrics: ShuffleReadMetricsReporter,
 serializerManager: SerializerManager = SparkEnv.get.serializerManager,
 blockManager: BlockManager = SparkEnv.get.blockManager,
-mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
+mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
+mapId: Option[Int] = None)
   extends ShuffleReader[K, C] with Logging {
 
   private val dep = handle.dependency
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
+val blocksByAddress = (mapId) match {
 
 Review comment:
   `(mapId)` -> `mapId`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei commented on a change in pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution

2019-10-11 Thread GitBox
turboFei commented on a change in pull request #26090: [SPARK-29302]Fix writing 
file collision in dynamic partition overwrite mode within speculative execution
URL: https://github.com/apache/spark/pull/26090#discussion_r333941272
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -89,7 +92,7 @@ class HadoopMapReduceCommitProtocol(
* The staging directory of this write job. Spark uses it to deal with files 
with absolute output
* path, or writing data into partitioned directory with 
dynamicPartitionOverwrite=true.
*/
-  private def stagingDir = new Path(path, ".spark-staging-" + jobId)
+  private def stagingDir = new Path(stagingPath.getOrElse(path))
 
 Review comment:
   AFAIK, this staging dir is used to deal with files with absolute output 
path, or writing data into partitioned directory with 
dynamicPartitionOverwrite=true.
   
   But in this PR, it change the behavior for absolute output path.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333942317
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ##
 @@ -2267,6 +2267,215 @@ case class FormatNumber(x: Expression, d: Expression)
   override def prettyName: String = "format_number"
 }
 
+/**
+ * A function that converts string to numeric.
 
 Review comment:
   I will preserve this line and change the `usage` part as `Convert `strExpr` 
to a number based on the `patternExpr`.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333942529
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ##
 @@ -2267,6 +2267,215 @@ case class FormatNumber(x: Expression, d: Expression)
   override def prettyName: String = "format_number"
 }
 
+/**
+ * A function that converts string to numeric.
+ */
+// scalastyle:off line.size.limit
 
 Review comment:
   Thanks, I will follow your suggestion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333943188
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
 ##
 @@ -54,6 +54,19 @@ private[spark] trait ShuffleManager {
   context: TaskContext,
   metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
 
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive) to
+   * read from mapId.
+   * Called on executors by reduce tasks.
+   */
+  def getMapReader[K, C](
 
 Review comment:
   instead of creating a new API, how about we set the map id in 
`TaskContext.getLocalProperties`, and still call the existing `getReader` 
method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333943901
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -669,6 +685,34 @@ private[spark] class MapOutputTrackerMaster(
 None
   }
 
+  /**
+   * Return the locations where the Mapper(s) ran. The locations each includes 
both a host and an
+   * executor id on that host.
+   *
+   * @param dep shuffle dependency object
+   * @param startMapId the start map id
+   * @param endMapId the end map id
 
 Review comment:
   We shouldn't add code that only for future use. For now we only need a 
single `mapId`, IIUC.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333944281
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
 ##
 @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   Literal.create(null, IntegerType), Literal.create(null, IntegerType)), 
null)
   }
 
+  test("ToNumber") {
+// Test '0' and '9'
+checkEvaluation(ToNumber(Literal("454"), Literal("")), "")
+checkEvaluation(ToNumber(Literal("454"), Literal("9")), "4")
+checkEvaluation(ToNumber(Literal("454"), Literal("99")), "45")
+checkEvaluation(ToNumber(Literal("454"), Literal("999")), "454")
+checkEvaluation(ToNumber(Literal("054"), Literal("999")), "54")
+checkEvaluation(ToNumber(Literal("404"), Literal("999")), "404")
+checkEvaluation(ToNumber(Literal("450"), Literal("999")), "450")
+checkEvaluation(ToNumber(Literal("0454"), Literal("999")), "45")
+checkEvaluation(ToNumber(Literal("4540"), Literal("999")), "454")
+checkEvaluation(ToNumber(Literal("454"), Literal("0")), "4")
+checkEvaluation(ToNumber(Literal("454"), Literal("00")), "45")
+checkEvaluation(ToNumber(Literal("454"), Literal("000")), "454")
+checkEvaluation(ToNumber(Literal("054"), Literal("000")), "54")
+checkEvaluation(ToNumber(Literal("404"), Literal("000")), "404")
+checkEvaluation(ToNumber(Literal("450"), Literal("000")), "450")
+checkEvaluation(ToNumber(Literal("0454"), Literal("000")), "45")
+checkEvaluation(ToNumber(Literal("4540"), Literal("000")), "454")
+// Test '.' and 'D'
+checkEvaluation(ToNumber(Literal("454.0"), Literal("999.9")), "454")
+checkEvaluation(ToNumber(Literal("454.0"), Literal("000.0")), "454")
+checkEvaluation(ToNumber(Literal("454.0"), Literal("999D9")), "454")
+checkEvaluation(ToNumber(Literal("454.0"), Literal("000D0")), "454")
+checkEvaluation(ToNumber(Literal("454.00"), Literal("000D00")), "454")
+checkEvaluation(ToNumber(Literal("454.3"), Literal("999.9")), "454.3")
+checkEvaluation(ToNumber(Literal("454.3"), Literal("000.0")), "454.3")
+checkEvaluation(ToNumber(Literal("454.3"), Literal("999D9")), "454.3")
+checkEvaluation(ToNumber(Literal("454.3"), Literal("000D0")), "454.3")
+// Test ',' and 'G'
+checkEvaluation(ToNumber(Literal("12,454"), Literal("99,999")), "12454")
+checkEvaluation(ToNumber(Literal("12,454"), Literal("00,000")), "12454")
+checkEvaluation(ToNumber(Literal("12,454"), Literal("99G999")), "12454")
+checkEvaluation(ToNumber(Literal("12,454"), Literal("00G000")), "12454")
+checkEvaluation(ToNumber(Literal("12,454,367"), Literal("99,999,999")), 
"12454367")
+checkEvaluation(ToNumber(Literal("12,454,367"), Literal("00,000,000")), 
"12454367")
+checkEvaluation(ToNumber(Literal("12,454,367"), Literal("99G999G999")), 
"12454367")
+checkEvaluation(ToNumber(Literal("12,454,367"), Literal("00G000G000")), 
"12454367")
+// Test 'S'
+checkEvaluation(ToNumber(Literal("454-"), Literal("999S")), "-454")
+checkEvaluation(ToNumber(Literal("454-"), Literal("000S")), "-454")
+checkEvaluation(ToNumber(Literal("12,454.8-"), Literal("99G999D9S")), 
"-12454.8")
+checkEvaluation(ToNumber(Literal("1-2,454.8"), Literal("9S9,999D9")), 
"-12454.8")
+checkEvaluation(ToNumber(Literal("00,454.8-"), Literal("99G999.9S")), 
"-454.8")
+// Test 'L'
+checkEvaluation(ToNumber(Literal("CNY234234.4350"), 
Literal("L99.")), "234234.435")
+checkEvaluation(ToNumber(Literal("RMB34234.4350"), 
Literal("L9.")), "34234.435")
+checkEvaluation(ToNumber(Literal("RY34234.4350"), Literal("L9.")), 
"34234.435")
+checkEvaluation(ToNumber(Literal("R34234.4350"), Literal("L9.")), 
"34234.435")
+
+ToNumber(Literal("454.3.2"), Literal("999D9D9")).checkInputDataTypes() 
match {
 
 Review comment:
   > Here you check only one negative tests. Could you add a little bit more 
negative tests.
   
   OK. I will add more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333916849
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
 ##
 @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   Literal.create(null, IntegerType), Literal.create(null, IntegerType)), 
null)
   }
 
+  test("ToNumber") {
 
 Review comment:
   @MaxGekk Thanks for this review. I test the locale on `Postgresql` but 'L' 
seems not works.
   ```
   select to_number('USD34234.4350', 'L9.');  // 34234.435
   select to_number('EUR34234.4350', 'L9.');  // 34234.435
   select to_number('RY34234.4350', 'L9.');  // 34234.435
   ```
   Although 'RY ' is not a valid locale , the result is the same as the others .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add Postgresql function to_number.

2019-10-11 Thread GitBox
beliefer commented on a change in pull request #25963: [SPARK-28137][SQL] Add 
Postgresql function to_number.
URL: https://github.com/apache/spark/pull/25963#discussion_r333917844
 
 

 ##
 File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
 ##
 @@ -840,6 +841,62 @@ class StringExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   Literal.create(null, IntegerType), Literal.create(null, IntegerType)), 
null)
   }
 
+  test("ToNumber") {
 
 Review comment:
   I think the description of locales is not consistent with the behavior in 
fact. Maybe I should remove the comment for locale as it was a bug of 
`Postgresql`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25295: [SPARK-28560][SQL] 
Optimize shuffle reader to local shuffle reader when smj converted to bhj in 
adaptive execution
URL: https://github.com/apache/spark/pull/25295#discussion_r333946016
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
 ##
 @@ -749,6 +818,26 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 }
   }
 
+  override def getMapSizesByExecutorId(
+  shuffleId: Int,
+  startPartition: Int,
+  endPartition: Int,
+  mapId: Int,
+  useOldFetchProtocol: Boolean) : Iterator[(BlockManagerId, Seq[(BlockId, 
Long, Int)])] = {
+logDebug(s"Fetching outputs for shuffle $shuffleId, mapId $mapId" +
+  s"partitions $startPartition-$endPartition")
+val statuses = getStatuses(shuffleId)
 
 Review comment:
   Since we need to get all the shuffle status anyway, we can call the existing 
`convertMapStatuses`, and do an extra filter to only collect the blocks whose 
mapIndex is what we want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Clarkkkkk commented on a change in pull request #26090: [SPARK-29302]Fix writing file collision in dynamic partition overwrite mode within speculative execution

2019-10-11 Thread GitBox
Clark commented on a change in pull request #26090: [SPARK-29302]Fix 
writing file collision in dynamic partition overwrite mode within speculative 
execution
URL: https://github.com/apache/spark/pull/26090#discussion_r333954828
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -89,7 +92,7 @@ class HadoopMapReduceCommitProtocol(
* The staging directory of this write job. Spark uses it to deal with files 
with absolute output
* path, or writing data into partitioned directory with 
dynamicPartitionOverwrite=true.
*/
-  private def stagingDir = new Path(path, ".spark-staging-" + jobId)
+  private def stagingDir = new Path(stagingPath.getOrElse(path))
 
 Review comment:
   You are right, stagingDir cannot be used here, as it might get deleted 
during abortJob.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf commented on a change in pull request #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#discussion_r333938397
 
 

 ##
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##
 @@ -1725,6 +1725,30 @@ class Dataset[T] private[sql](
   withGroupingKey.newColumns)
   }
 
+  /**
+   * Returns a [[KeyValueGroupedDataset]] where the data is grouped by a row 
of given
+   * columns.
+   *
+   * @group typedrel
+   * @since 3.0.0
+   */
+  @scala.annotation.varargs
+  def groupByRelationKey(
+  col1: String,
+  cols: String*): KeyValueGroupedDataset[Row, Row] = {
+val colNames: Seq[String] = col1 +: cols
+val keyAttrs = colNames.map(colName => resolve(colName).toAttribute)
+val keySchema = StructType.fromAttributes(keyAttrs)
+val keyEncoder = RowEncoder(keySchema)
+val valEncoder = RowEncoder(schema)
 
 Review comment:
   Then the user can leverage the `keyAs[L : Encoder]` method on 
`KeyValueGroupedDataset` to get full type information so that entirely typed  
`cogroup` would be possible (which is arguably the more userfriendly approach).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #25955: [SPARK-29277][SQL] Add early DSv2 filter and projection pushdown

2019-10-11 Thread GitBox
cloud-fan commented on a change in pull request #25955: [SPARK-29277][SQL] Add 
early DSv2 filter and projection pushdown
URL: https://github.com/apache/spark/pull/25955#discussion_r333962848
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ##
 @@ -55,7 +56,52 @@ case class DataSourceV2Relation(
   }
 
   override def computeStats(): Statistics = {
-val scan = newScanBuilder().build()
+if (Utils.isTesting) {
+  // when testing, throw an exception if this computeStats method is 
called because stats should
+  // not be accessed before pushing the projection and filters to create a 
scan. otherwise, the
+  // stats are not accurate because they are based on a full table scan of 
all columns.
+  throw new UnsupportedOperationException(
+s"BUG: computeStats called before pushdown on DSv2 relation: $name")
+} else {
+  // when not testing, return stats because bad stats are better than 
failing a query
+  newScanBuilder() match {
+case r: SupportsReportStatistics =>
+  val statistics = r.estimateStatistics()
+  DataSourceV2Relation.transformV2Stats(statistics, None, 
conf.defaultSizeInBytes)
+case _ =>
+  Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+  }
+
+  override def newInstance(): DataSourceV2Relation = {
+copy(output = output.map(_.newInstance()))
+  }
+}
+
+/**
+ * A logical plan for a DSv2 table with a scan already created.
+ *
+ * This is used in the optimizer to push filters and projection down before 
conversion to physical
+ * plan. This ensures that the stats that are used by the optimizer account 
for the filters and
+ * projection that will be pushed down.
+ *
+ * @param table a DSv2 [[Table]]
+ * @param scan a DSv2 [[Scan]]
+ * @param output the output attributes of this relation
+ */
+case class DataSourceV2ScanRelation(
 
 Review comment:
   Makes sense. But it's still better to reduce the maintenance cost a bit. How 
about we clearly define the life cycle of a v2 relation as: 
`UnresolvedRelation` (before analysis) -> `DataSourceV2Relation` (after 
analysis) -> `DataSourceV2ScanRelation` (after optimization) -> the physical 
scan node.
   
   Then we only need to match one node:
   1. if dealing with `analyzedPlan`, match `DataSourceV2Relation`
   2. if dealing with `optimizedPlan`, match `DataSourceV2ScanRelation`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] s1ck commented on issue #24851: [SPARK-27303][GRAPH] Add Spark Graph API

2019-10-11 Thread GitBox
s1ck commented on issue #24851: [SPARK-27303][GRAPH] Add Spark Graph API
URL: https://github.com/apache/spark/pull/24851#issuecomment-541041551
 
 
   @dongjoon-hyun Thanks for the extra rounds. We made the `DataFrame` to 
`Dataset[Row]` changes and added some clarifying docs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf edited a comment on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf edited a comment on issue #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#issuecomment-541016870
 
 
   Thank you for creating a PR so fast after my submitting the jira ticket! 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hagerf commented on issue #26087: [SPARK-29427][SQL] Create KeyValueGroupedDataset from existing columns in DataFrame

2019-10-11 Thread GitBox
hagerf commented on issue #26087: [SPARK-29427][SQL] Create 
KeyValueGroupedDataset from existing columns in DataFrame
URL: https://github.com/apache/spark/pull/26087#issuecomment-541051711
 
 
   I added PR with these changes and a new test for the typed datasets. 👍 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time

2019-10-11 Thread GitBox
skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support 
defining HADOOP_CONF_DIR and config map at the same time
URL: https://github.com/apache/spark/pull/25609#discussion_r333985107
 
 

 ##
 File path: docs/security.md
 ##
 @@ -845,8 +845,13 @@ When talking to Hadoop-based services behind Kerberos, it 
was noted that Spark n
 so that non-local processes can authenticate. These delegation tokens in 
Kubernetes are stored in Secrets that are
 shared by the Driver and its Executors. As such, there are three ways of 
submitting a Kerberos job:
 
-In all cases you must define the environment variable: `HADOOP_CONF_DIR` or
-`spark.kubernetes.hadoop.configMapName.`
+In all cases you must define the environment variable: `HADOOP_CONF_DIR` or/and
+`spark.kubernetes.hadoop.configMapName`. When only `HADOOP_CONF_DIR` is 
configured, the hadoop configurations in there
+will be used by the client process, the Driver and its Executors. When only 
`spark.kubernetes.hadoop.configMapName`
+is configured, the hadoop configurations will only be used by the Driver and 
its Executors. If your client process has
+extra dependencies to upload to `spark.kubernetes.file.upload.path`, you may 
need to configure `HADOOP_CONF_DIR` too.
+When these two variables are both set, Spark will prefer 
`spark.kubernetes.hadoop.configMapName` to be mounted on the
+Driver/Executor pods.
 
 Review comment:
   The concept looks good to me, @ifilonenko any corner cases?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan opened a new pull request #26091: [SPARK-29439][SQL] DDL commands should not use DataSourceV2Relation

2019-10-11 Thread GitBox
cloud-fan opened a new pull request #26091: [SPARK-29439][SQL] DDL commands 
should not use DataSourceV2Relation
URL: https://github.com/apache/spark/pull/26091
 
 
   
   
   ### What changes were proposed in this pull request?
   
   Create a new node `ResolvedV2Table`, and resolve `UnresolvedV2Relation` to 
this new node instead of `DataSourceV2Relation`.
   
   ### Why are the changes needed?
   
   `DataSourceV2Relation` is a scan node. It should be used when we want to 
scan a v2 table. However, the DDL commands do not need to scan a v2 table, they 
just need a node to hold the v2 table. It's possible that there are rules 
trying to match `DataSourceV2Relation` and convert it to something else (e.g. a 
new `DataSourceV2ScanRelation`), for better data scan. Unfortunately doing this 
will break these DDL commands. It's better to have a separated node to hold the 
v2 table for DDL commands.
   
   ### Does this PR introduce any user-facing change?
   
   no
   
   ### How was this patch tested?
   
   existing tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time

2019-10-11 Thread GitBox
skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support 
defining HADOOP_CONF_DIR and config map at the same time
URL: https://github.com/apache/spark/pull/25609#discussion_r333990738
 
 

 ##
 File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
 ##
 @@ -62,6 +65,17 @@ class HadoopConfDriverFeatureStepSuite extends 
SparkFunSuite {
 assert(hadoopConfMap.getData().keySet().asScala === confFiles)
   }
 
+  test("using hadoop config map ahead of conf dir if both defined") {
+val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> 
confDir.getAbsolutePath()))
+  .set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap")
+
+val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
 
 Review comment:
   nit: remove space


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on issue #26091: [SPARK-29439][SQL] DDL commands should not use DataSourceV2Relation

2019-10-11 Thread GitBox
cloud-fan commented on issue #26091: [SPARK-29439][SQL] DDL commands should not 
use DataSourceV2Relation
URL: https://github.com/apache/spark/pull/26091#issuecomment-541064323
 
 
   @brkyvz @rdblue 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time

2019-10-11 Thread GitBox
skonto commented on a change in pull request #25609: [SPARK-28896][K8S] Support 
defining HADOOP_CONF_DIR and config map at the same time
URL: https://github.com/apache/spark/pull/25609#discussion_r333990738
 
 

 ##
 File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
 ##
 @@ -62,6 +65,17 @@ class HadoopConfDriverFeatureStepSuite extends 
SparkFunSuite {
 assert(hadoopConfMap.getData().keySet().asScala === confFiles)
   }
 
+  test("using hadoop config map ahead of conf dir if both defined") {
+val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> 
confDir.getAbsolutePath()))
+  .set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap")
+
+val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
 
 Review comment:
   minor: remove space


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on issue #26089: [SPARK-29423][SQL] lazily initialize StreamingQueryManager in SessionState

2019-10-11 Thread GitBox
HeartSaVioR commented on issue #26089: [SPARK-29423][SQL] lazily initialize 
StreamingQueryManager in SessionState
URL: https://github.com/apache/spark/pull/26089#issuecomment-541065149
 
 
   I expect the code change would work, but it would be even better if you 
could attach some result of manual tests in description of PR like taking 
snapshot for sampler page in jvisualvm and locate StreamingQueryListenerBus - 
current latest master and after applying this patch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] skonto commented on issue #25609: [SPARK-28896][K8S] Support defining HADOOP_CONF_DIR and config map at the same time

2019-10-11 Thread GitBox
skonto commented on issue #25609: [SPARK-28896][K8S] Support defining 
HADOOP_CONF_DIR and config map at the same time
URL: https://github.com/apache/spark/pull/25609#issuecomment-541065986
 
 
   @erikerlandson LGTM, we need an integration test for this @ifilonenko gentle 
ping.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tgravescs commented on issue #26085: [SPARK-29434] [Core] Improve the MapStatuses Serialization Performance

2019-10-11 Thread GitBox
tgravescs commented on issue #26085: [SPARK-29434] [Core] Improve the 
MapStatuses Serialization Performance
URL: https://github.com/apache/spark/pull/26085#issuecomment-541067929
 
 
   I haven't looked at the code yet, can you clarify what ops/ms is measuring 
here?  
   
   >> For smaller MapStatuses, we find it's 2x faster.
   
   Just to verify, this holds true for small jobs, where you have 10 outputs or 
100 outputs?
   
   Did you try with say 2000 or 5000 blocks?  The performance numbers for zstd 
ops/ms for the 2 step dropped a lot between 5 and 500, whereas the 1 step 
didn't drop nearly as much, wondering if 1 step becomes faster at some point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26018: [SPARK-29352][SQL][SS] Track active streaming queries in the SparkSession.sharedState

2019-10-11 Thread GitBox
HeartSaVioR commented on a change in pull request #26018: 
[SPARK-29352][SQL][SS] Track active streaming queries in the 
SparkSession.sharedState
URL: https://github.com/apache/spark/pull/26018#discussion_r333997947
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ##
 @@ -355,7 +355,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   // Make sure no other query with same id is active across all sessions
   val activeOption =
 
Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, 
this))
-  if (activeOption.isDefined) {
+  if (activeOption.isDefined || activeQueries.values.exists(_.id == 
query.id) {
 
 Review comment:
   Just curious: are you seeing the actual case where activeOption is None but 
activeQueries contain such query? I'm not seeing the case, though I don't think 
adding this would hurt.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite and specified abs path be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of 
a task for dynamic partition overwrite and specified abs path be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545
 
 
   Oh, It seems that this issue is related with 
https://github.com/apache/spark/pull/24142.
   I just read the comments of that, it seems that there are some risk that 
cause duplicate result.
   
   @ajithme  @vanzin  @LantaoJin 
   
   How about  using this method to name a task file for dynamic partition 
overwrite only?
   
   Because for this cases, it will keep a filesToMove set, which would not 
cause duplicate result.
   
   
   P.S:
   I doubt whether it would cause duplicate result for non dynamic partition 
overwrite case.
   
   Should outputCommitCoordinator decide which task can commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of a task for dynamic partition overwrite be unique

2019-10-11 Thread GitBox
turboFei edited a comment on issue #26086: [SPARK-29302] Make the file name of 
a task for dynamic partition overwrite be unique
URL: https://github.com/apache/spark/pull/26086#issuecomment-540982545
 
 
   Oh, It seems that this issue is related with 
https://github.com/apache/spark/pull/24142.
   I just read the comments of that, it seems that there are some risk that 
cause duplicate result.
   
   @ajithme  @vanzin  @LantaoJin 
   
   How about  using this method to name a task file for dynamic partition 
overwrite only?
   
   Because for this case, it will keep a filesToMove set, which would not cause 
duplicate result.
   
   
   P.S:
   I doubt whether it would cause duplicate result for non dynamic partition 
overwrite case.
   
   Should outputCommitCoordinator decide which task can commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] nonsleepr commented on a change in pull request #26000: [SPARK-29330][CORE][YARN] Allow users to chose the name of Spark Shuffle service

2019-10-11 Thread GitBox
nonsleepr commented on a change in pull request #26000: 
[SPARK-29330][CORE][YARN] Allow users to chose the name of Spark Shuffle service
URL: https://github.com/apache/spark/pull/26000#discussion_r334009122
 
 

 ##
 File path: 
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 ##
 @@ -136,7 +136,11 @@
   private DB db;
 
   public YarnShuffleService() {
-super("spark_shuffle");
+this("spark_shuffle");
+  }
+
+  protected YarnShuffleService(String serviceName) {
 
 Review comment:
   The name specified here is actually useful only in tests. YARN's service 
instantiation logic wouldn't even pass the name of the service used in the 
config to instantiated service. I guess that's the main reason the names and 
ports are hardcoded or bound to non-namespaced configuration keys.
   The way HDP overcomes that is by providing different classpaths with 
different implementations for different versions of the service 
(`spark_shuffle` for Spark 1.6.x and `spark2_shuffle` for Spark 2+). The only 
way I see it's possible to pass different parameters to the same implementation 
of the service is by providing different configs on the classpath.
   
   I will add a comment here stating that the name is actually only used for 
the tests, but otherwise would always be hardcoded to `spark_shuffle`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on issue #25943: [WIP][SPARK-29261][SQL][CORE] Support recover live entities from KVStore for (SQL)AppStatusListener

2019-10-11 Thread GitBox
Ngone51 commented on issue #25943: [WIP][SPARK-29261][SQL][CORE] Support 
recover live entities from KVStore for (SQL)AppStatusListener
URL: https://github.com/apache/spark/pull/25943#issuecomment-541080685
 
 
   > So you're saying the KVStore already has enough info, this PR just 
repopulates the in-memory version of all the LiveEntities?
   
   Yeah. The snapshot is used to restore a KVStore. And then, we recover live 
entities for AppStatusListener/SQLAppStatusListener from that restored KVStore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk opened a new pull request #26092: [SPARK-29440][SQL] Support java.time.Duration as an external type of CalendarIntervalType

2019-10-11 Thread GitBox
MaxGekk opened a new pull request #26092: [SPARK-29440][SQL] Support 
java.time.Duration as an external type of CalendarIntervalType
URL: https://github.com/apache/spark/pull/26092
 
 
   ### What changes were proposed in this pull request?
   In the PR, I propose to convert values of the `CalendarIntervalType` 
Catalyst's type to the `java.time.Duration` values when such values are need 
outside of Spark, for example in UDF. If an `INTERVAL` values has non-zero 
`months` field, it is converted to number of seconds assuming `2629746` seconds 
per months. This average number of seconds per month was given by assuming that 
the average year of the Gregorian calendar `365.2425` days long (see 
https://en.wikipedia.org/wiki/Gregorian_calendar): `60 * 60 * 24 * 365.2425` = 
`31556952.0` = `12 * 2629746`.
   
   For example:
   ```sql
   scala> val plusDay = udf((i: java.time.Duration) => i.plusDays(1))
   plusDay: org.apache.spark.sql.expressions.UserDefinedFunction = 
SparkUserDefinedFunction($Lambda$1855/165450258@485996f7,CalendarIntervalType,List(Some(Schema(CalendarIntervalType,true))),None,true,true)
   
   scala> val df = spark.sql("SELECT interval 40 minutes as i")
   df: org.apache.spark.sql.DataFrame = [i: interval]
   
   scala> df.show
   +---+
   
   |  i|
   +---+
   |interval 40 minutes|
   +---+
   
   scala> df.select(plusDay('i)).show(false)
   +--+
   |UDF(i)|
   +--+
   |interval 1 days 40 minutes|
   +--+
   ```
   I added an implicit encoder for `java.time.Duration` which allows to create 
Spark dataframe from an external collections:
   ```sql
   scala> Seq(Duration.ofDays(10), Duration.ofHours(10)).toDS.show(false)
   +---+
   |value  |
   +---+
   |interval 1 weeks 3 days|
   |interval 10 hours  |
   +---+
   ```
   
   ### Why are the changes needed?
   This should allow to users:
   - Write UDF over interval inputs
   - Use Java 8 libraries for `java.time.Duration` in manipulations on 
collected values or in UDFs
   - Create dataframes from a collection of `java.time.Duration` values.
   
   ### Does this PR introduce any user-facing change?
   Yes, currently `collect()` returns not public class `CalendarInterval`:
   ```
   scala> spark.sql("select interval 1 
week").collect().apply(0).get(0).isInstanceOf[org.apache.spark.unsafe.types.CalendarInterval]
   res2: Boolean = true
   ```
   After the changes:
   ```
   scala> spark.sql("select interval 1 
week").collect().apply(0).get(0).isInstanceOf[Duration]
   res8: Boolean = true
   ```
   ### How was this patch tested?
   - Added new testes to `CatalystTypeConvertersSuite` to check conversion of 
`CalendarIntervalType` to/from `java.time.Duration`
   - By `JavaUDFSuite`/ `UDFSuite` to test usage of `Duration` type in 
Scala/Java UDFs.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   >