[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

2018-09-01 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/1#discussion_r214508181
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl(
   accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
   blockManagerId: BlockManagerId): Boolean = {
 // (taskId, stageId, stageAttemptId, accumUpdates)
-val accumUpdatesWithTaskIds: Array[(Long, Int, Int, 
Seq[AccumulableInfo])] = synchronized {
+val accumUpdatesWithTaskIds: Array[(Long, Int, Int, 
Seq[AccumulableInfo])] = {
   accumUpdates.flatMap { case (id, updates) =>
 val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), 
None))
-taskIdToTaskSetManager.get(id).map { taskSetMgr =>
+Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
--- End diff --

Just leave a small concern here, original code locked hole scope of ids in 
`accumUpdates`, after this changing, maybe some id could be found originally 
but can't find now, because `taskIdToTaskSetManager` can be changed by 
`removeExecutor` or `statusUpdate`. Its not big problem if executor has been 
removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22205: [SPARK-25212][SQL] Support Filter in ConvertToLoc...

2018-08-31 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22205#discussion_r214505595
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1349,6 +1357,12 @@ object ConvertToLocalRelation extends 
Rule[LogicalPlan] {
 
 case Limit(IntegerLiteral(limit), LocalRelation(output, data, 
isStreaming)) =>
   LocalRelation(output, data.take(limit), isStreaming)
+
+case Filter(condition, LocalRelation(output, data, isStreaming))
--- End diff --

super nit: comment in 
https://github.com/apache/spark/pull/22205/files#diff-a636a87d8843eeccca90140be91d4fafR1348
 not change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214084903
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
 return result;
   }
 
-  public static UnsafeArrayData forPrimitiveArray(int offset, int length, 
int elementSize) {
-return fromPrimitiveArray(null, offset, length, elementSize);
-  }
-
-  public static boolean shouldUseGenericArrayData(int elementSize, int 
length) {
--- End diff --

Yep, the UT failed log proved 
this:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport/org.apache.spark.sql.catalyst.expressions/CollectionExpressionsSuite/Array_Union/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22282
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214075761
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray(
 return result;
   }
 
-  public static UnsafeArrayData forPrimitiveArray(int offset, int length, 
int elementSize) {
-return fromPrimitiveArray(null, offset, length, elementSize);
-  }
-
-  public static boolean shouldUseGenericArrayData(int elementSize, int 
length) {
--- End diff --

I think `shouldUseGenericArrayData` is still used in generated code, check 
the code here:

https://github.com/apache/spark/blob/b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3633


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...

2018-08-30 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/2
  
@cloud-fan @rdblue 
I want to leave some comments and thoughts during looking into this again, 
hope these can help us deciding the next step plan.
Currently all the plan assumed input row is `RDD[InternalRow]`, whole 
framework treat columnar read as special case. Also the `inputRDDs` function 
not only be called in `WholeStageCodegenExec`, but also all the father physical 
node, it's very easy to get a mess in the scenario of nested plan during debug 
this fix. So we may have these 3 choices, the first two can totally remove cast 
but maybe have many changes on `CodegenSupport`, the last one can limited the 
changes but still has cast problem:
1. Erasure the type of `inputRDDs`, because we should allow both 
RDD[InternalRow] and RDD[ColumnarBatch] passed, mainly for the parent physical 
plan call the child. This is implemented as the last commit in this PR: 
https://github.com/apache/spark/pull/2/files
2. Refactor the framework to let all plan dealing with columnar batch
3. Limited the changes in `ColumnarBatchScan`, don't change 
`CodegenSupport`, but still left the cast problem. This is implemented as the 
first two commit in this PR: 
https://github.com/apache/spark/pull/2/files/7e88599dfc2caf177d12e890d588be68bdd3bc8e

If all of these are not make sense, I'll just close this. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...

2018-08-29 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/2
  
Got it, I'll revert the changes in file source in this commit, thanks for 
your reply.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22252: [SPARK-25261][MINOR][DOC] correct the default uni...

2018-08-29 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22252#discussion_r213708177
  
--- Diff: docs/configuration.md ---
@@ -152,7 +152,7 @@ of the most common options to set are:
   spark.driver.memory
   1g
   
-Amount of memory to use for the driver process, i.e. where 
SparkContext is initialized, in MiB 
+Amount of memory to use for the driver process, i.e. where 
SparkContext is initialized, in bytes 
--- End diff --

I think I got the point you want to report @ivoson, IIUC, this is a bug in 
the code not in doc, we should also make `spark.driver.memory=1024` with the 
unit of MiB, maybe this change the original behavior and we can announce in 
migrate guide? cc @srowen @HyukjinKwon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...

2018-08-29 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/2
  
@cloud-fan Thanks for your reply Wenchen, I'm trying to achieve this in 
this commit, please take a look, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22252: [SPARK-25261][MINOR][DOC] correct the default uni...

2018-08-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22252#discussion_r213343952
  
--- Diff: docs/configuration.md ---
@@ -152,7 +152,7 @@ of the most common options to set are:
   spark.driver.memory
   1g
   
-Amount of memory to use for the driver process, i.e. where 
SparkContext is initialized, in MiB 
+Amount of memory to use for the driver process, i.e. where 
SparkContext is initialized, in bytes 
--- End diff --

Check the config code here.

https://github.com/apache/spark/blob/99d2e4e00711cffbfaee8cb3da9b6b3feab8ff18/core/src/main/scala/org/apache/spark/internal/config/package.scala#L40-L43


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22149
  
```
Is that possible to add a test case?
```
Thanks for your reply Xiao, we encountered some difficulties during the 
test case, cause this need mock on speculative behavior. We will keep looking 
into this. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22149
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22024: [SPARK-25034][CORE] Remove allocations in onBlockFetchSu...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22024
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22024: [SPARK-25034][CORE] Remove allocations in onBlock...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22024#discussion_r213015113
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---
@@ -101,15 +101,7 @@ abstract class BlockTransferService extends 
ShuffleClient with Closeable with Lo
   result.failure(exception)
 }
 override def onBlockFetchSuccess(blockId: String, data: 
ManagedBuffer): Unit = {
-  data match {
-case f: FileSegmentManagedBuffer =>
-  result.success(f)
-case _ =>
-  val ret = ByteBuffer.allocate(data.size.toInt)
--- End diff --

The copy behavior was introduced by : 
https://github.com/apache/spark/pull/2330/commits/69f5d0a2434396abbbd98886e047bc08a9e65565.
 How can you make sure this can be replaced by increasing the reference count?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22024: [SPARK-25034][CORE] Remove allocations in onBlock...

2018-08-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22024#discussion_r213015245
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -160,7 +160,13 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
   releaseLock(pieceId)
 case None =>
   bm.getRemoteBytes(pieceId) match {
-case Some(b) =>
+case Some(splitB) =>
+
+  // Checksum computation and further computations require the 
data
+  // from the ChunkedByteBuffer to be merged, so we we merge 
it now.
--- End diff --

nit of the comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...

2018-08-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22198#discussion_r212822215
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala ---
@@ -191,6 +195,48 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1)
   }
 
+  test("SPARK-25121 Supports multi-part names for broadcast hint 
resolution") {
+val (table1Name, table2Name) = ("t1", "t2")
+withTempDatabase { dbName =>
+  withTable(table1Name, table2Name) {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+  spark.range(50).write.saveAsTable(s"$dbName.$table1Name")
+  spark.range(100).write.saveAsTable(s"$dbName.$table2Name")
+  // First, makes sure a join is not broadcastable
+  val plan = sql(s"SELECT * FROM $dbName.$table1Name, 
$dbName.$table2Name " +
+  s"WHERE $table1Name.id = $table2Name.id")
+.queryExecution.executedPlan
+  assert(plan.collect { case p: BroadcastHashJoinExec => p }.size 
== 0)
+
+  // Uses multi-part table names for broadcast hints
+  def checkIfHintApplied(tableName: String, hintTableName: 
String): Unit = {
--- End diff --

`hintTableName` is never used in this func?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...

2018-08-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/2#discussion_r212820814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -307,7 +308,7 @@ case class FileSourceScanExec(
 withSelectedBucketsCount
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = {
+  private lazy val inputRDD: RDD[Object] = {
--- End diff --

Thanks Sean! Addressed in 7e88599.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...

2018-08-24 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/2
  
cc @cloud-fan and @rdblue have a look when you have time. If this PR 
doesn't coincide with your expect, I'll close this soon. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...

2018-08-24 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/2#discussion_r212784374
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala 
---
@@ -40,6 +42,29 @@ private[sql] trait ColumnarBatchScan extends 
CodegenSupport {
 "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
 "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
+  /**
+   * Returns all the RDDs of ColumnarBatch which generates the input rows.
+   */
+  def inputBatchRDDs(): Seq[RDD[ColumnarBatch]]
+
+  /**
+   * Returns all the RDDs of InternalRow which generates the input rows.
+   */
+  def inputRowRDDs(): Seq[RDD[InternalRow]]
+
+  /**
+   * Get input RDD depends on supportsBatch.
+   */
+  final def getInputRDDs(): Seq[RDD[InternalRow]] = {
+if (supportsBatch) {
+  inputBatchRDDs().asInstanceOf[Seq[RDD[InternalRow]]]
--- End diff --

Here maybe the last explicitly erasure hack left, please check whether is 
it acceptable or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...

2018-08-24 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/2

[SPARK-25083][SQL] Remove the type erasure hack in data source scan

## What changes were proposed in this pull request?

1. Add function `inputBatchRDDs` and `inputRowRDDs` interface in 
`ColumnarBatchScan`.
2.rewrite them in physical node which extends `ColumnarBatchScan`.

## How was this patch tested?

Refactor work, test with existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25083

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2


commit 992a08b1d77d59daeac95c67d07e5b8efe20ce20
Author: Yuanjian Li 
Date:   2018-08-24T15:54:27Z

[SPARK-25083][SQL] Remove the type erasure hack in data source scan




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22202: [SPARK-25211][Core] speculation and fetch failed ...

2018-08-23 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22202#discussion_r212365264
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2246,58 +2247,6 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assertDataStructuresEmpty()
   }
 
-  test("Trigger mapstage's job listener in submitMissingTasks") {
--- End diff --

Could you give some explain for deleting this test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...

2018-08-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22149
  
Gental ping @gatorsmile.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
@jiangxb1987 Great thanks for your comment!
```
One general idea is that we don't need to rely on the RPC framework to test 
ContextBarrierState, just mock RpcCallContexts should be enough.
```
Actually I also want to implement like this at first also as you asked in 
jira, but `ContextBarrierState` is the private inner class in 
`BarrierCoordinator`. Could I do the refactor of moving `ContextBarrierState` 
out of `BarrierCoordinator`? If that is permitted I think we can just mock 
RpcCallContext to reach this.
```
We shall cover the following scenarios:
```
Pretty cool for the list, the 5 in front scenarios are including in 
currently implement, I'll add the last checking work of `Make sure we clear all 
the internal data under each case.` after we reach an agreement. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22177: stages in wrong order within job page DAG chart

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22177
  
Please change title to "[SPARK-25199][Web UI] XXX " as we described in 
http://spark.apache.org/contributing.html. 
```
check the DAG chart in job page.
```
Could you also put the DAG chart screenshot after your fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22177: stages in wrong order within job page DAG chart

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22177#discussion_r212003441
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---
@@ -337,7 +337,9 @@ private[ui] class JobPage(parent: JobsTab, store: 
AppStatusStore) extends WebUIP
   store.executorList(false), appStartTime)
 
 val operationGraphContent = 
store.asOption(store.operationGraphForJob(jobId)) match {
-  case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, 
operationGraph)
+  case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, 
operationGraph.sortWith(
+
_.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "").toInt
--- End diff --

Add `getStageId` function in `RDDOperationGraph` to do this will be better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22177: stages in wrong order within job page DAG chart

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22177#discussion_r212002571
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---
@@ -18,18 +18,18 @@
 package org.apache.spark.ui.jobs
 
 import java.util.Locale
+
 import javax.servlet.http.HttpServletRequest
 
 import scala.collection.mutable.{Buffer, ListBuffer}
 import scala.xml.{Node, NodeSeq, Unparsed, Utility}
-
--- End diff --

revert this changes in import.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22180: [SPARK-25174][YARN]Limit the size of diagnostic m...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22180#discussion_r211996461
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -368,7 +369,11 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
 logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
   Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
-finalMsg = msg
+finalMsg = if (msg == null || msg.length <= finalMsgLimitSize) {
+  msg
+} else {
+  msg.substring(0, finalMsgLimitSize)
--- End diff --

Maybe the message in last `finalMsgLimitSize` is more useful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22180: [SPARK-25174][YARN]Limit the size of diagnostic m...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22180#discussion_r211996874
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -143,6 +143,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   @volatile private var finished = false
   @volatile private var finalStatus = getDefaultFinalStatus
   @volatile private var finalMsg: String = ""
+  private val finalMsgLimitSize = sparkConf.get(AM_FINAL_MSG_LIMIT).toInt
--- End diff --

nit: move this to L165? just for code clean.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
AFAIC, the fix should forbid illegal extra value passing. If less values 
than fields it should get a `AttributeError` while accessing as the currently 
implement, not ban it here? What do you think :) @HyukjinKwon @BryanCutler 
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
My pleasure, just find this during glance over jira in recent days. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-21 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
cc @gatorsmile @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-21 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-20 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22165
  
cc @jiangxb1987 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-08-20 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22165

[SPARK-25017][Core] Add test suite for BarrierCoordinator and 
ContextBarrierState

## What changes were proposed in this pull request?

Currently `ContextBarrierState` and `BarrierCoordinator` are only covered 
by end-to-end test in `BarrierTaskContextSuite`, add BarrierCoordinatorSuite to 
test both classes.

## How was this patch tested?

UT in BarrierCoordinatorSuite.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22165.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22165


commit 21bd1c37f4af6480adfc07130a15f70acdeda378
Author: liyuanjian 
Date:   2018-08-21T05:24:07Z

[SPARK-25017][Core] Add test suite for BarrierCoordinator and 
ContextBarrierState




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row

2018-08-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22140
  
cc @HyukjinKwon


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...

2018-08-18 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22140

[SPARK-25072][PySpark] Forbid extra value for custom Row

## What changes were proposed in this pull request?

Add value length check in `_create_row`, forbid extra value for custom Row 
in PySpark.

## How was this patch tested?

New UT in pyspark-sql

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25072

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22140


commit b8c6522bccde51584e9878144924fd7b92f8785f
Author: liyuanjian 
Date:   2018-08-18T08:36:53Z

Forbidden extra value for custom Row




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22105: [SPARK-25115] [Core] Eliminate extra memory copy ...

2018-08-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22105#discussion_r210842394
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 ---
@@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, 
WritableByteChannel target) throws IOExcept
 // SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
 // for the case that the passed-in buffer has too many components.
 int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
--- End diff --

```
IIRC socket buffers are 32k by default on Linux, so it seems unlikely you'd 
be able to write 256k in one call (ignoring what IOUtil does internally). But 
maybe in practice it works ok.
```
After reading the context in #12083 and this discussion, I want to provide 
a possibility about 256k in one call can work in practice. As in our scenario, 
user will change `/proc/sys/net/core/wmem_default` based on their online 
behavior, generally we'll set this value larger than `wmem_default`.

![image](https://user-images.githubusercontent.com/4833765/44256457-cebc0980-a23b-11e8-9b70-c7ad66fcfe1c.png)
So maybe 256k of NIO_BUFFER_LIMIT is ok here? We just need add more 
annotation to remind what params related with this value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark ...

2018-08-16 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22122
  
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark ...

2018-08-16 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22122
  
```
Are they all instances to fix?
```
@HyukjinKwon Yep, I grep all `conf.get("spark.sql.xxx")` and make sure for 
this. The remaining of hard code config is StaticSQLConf 
`spark.sql.catalogImplementation` in session.py, it can't manage by SQLConf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in P...

2018-08-16 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22122

[SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark to manage all sql 
configs

## What changes were proposed in this pull request?

Follow up for SPARK-24665, find some others hard code during code review.

## How was this patch tested?

Existing UT.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-24665-follow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22122.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22122


commit 8a32e60a7af4f574176366eb057b219cb4511bb6
Author: Yuanjian Li 
Date:   2018-07-02T07:04:40Z

Use SQLConf in session.py and catalog.py




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22117: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...

2018-08-16 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22117
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22093: [SPARK-25100][CORE] Fix no registering TaskCommitMessage...

2018-08-15 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22093
  
`Should I delete current UT from FileSuit?`
I think current UT in `FileSuite` is unnecessarily, you can leave it and 
wait for other reviewer's opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22093: [SPARK-25100][CORE] Fix no registering TaskCommitMessage...

2018-08-14 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/22093
  
Why we should create own SparkContext here? Could we just add a UT like 
`registration of HighlyCompressedMapStatus` to check `TaskCommitMessage` 
working?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22093: [SPARK-25100][CORE] Fix no registering TaskCommit...

2018-08-13 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22093#discussion_r209650955
  
--- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala ---
@@ -424,6 +425,39 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
 randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
 assert(new File(tempDir.getPath + 
"/outputDataset_new/part-r-0").exists() === true)
   }
+  
+  test("SPARK-25100: Using KryoSerializer and" +
+  " setting registrationRequired true can lead job failed") {
+val tempDir = Utils.createTempDir()
+val inputDir = tempDir.getAbsolutePath + "/input"
+val outputDir = tempDir.getAbsolutePath + "/tmp"
+
+val writer = new PrintWriter(new File(inputDir))
+
+for(i <- 1 to 100) {
+  writer.print(i)
+  writer.write('\n')
+}
+
+writer.close()
+
+val conf = new SparkConf(false).setMaster("local").
+set("spark.kryo.registrationRequired", "true").setAppName("test")
+conf.set("spark.serializer", classOf[KryoSerializer].getName)
+conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
--- End diff --

Why we need set 'spark.serializer' twice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22057: [SPARK-25077][SQL] Delete unused variable in Wind...

2018-08-09 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22057

[SPARK-25077][SQL] Delete unused variable in WindowExec

## What changes were proposed in this pull request?

Just delete the unused variable `inputFields` in WindowExec, avoid making 
others confused while reading the code.

## How was this patch tested?

Existing UT.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25077

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22057.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22057


commit 90513587ed1d48437818e90f58612c344009f563
Author: liyuanjian 
Date:   2018-08-09T15:57:29Z

[SPARK-25077][SQL] Delete unused variable in WindowExec




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208260664
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +364,101 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
+
+  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
+
+  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
--- End diff --

`left.nullable && right.nullable`? Because if one side is empty map, NULL 
will be passed as the value for each key in other side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208257687
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
 ---
@@ -267,22 +267,23 @@ case class GetArrayItem(child: Expression, ordinal: 
Expression)
   }
 }
 
-/**
- * Common base class for [[GetMapValue]] and [[ElementAt]].
- */
-
-abstract class GetMapValueUtil extends BinaryExpression with 
ImplicitCastInputTypes {
+object GetMapValueUtil
+{
--- End diff --

nit: brace should in previous line. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function

2018-08-06 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21986#discussion_r207924294
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -205,29 +230,82 @@ case class ArrayTransform(
 (elementVar, indexVar)
   }
 
-  override def eval(input: InternalRow): Any = {
-val arr = this.input.eval(input).asInstanceOf[ArrayData]
-if (arr == null) {
-  null
-} else {
-  val f = functionForEval
-  val result = new GenericArrayData(new Array[Any](arr.numElements))
-  var i = 0
-  while (i < arr.numElements) {
-elementVar.value.set(arr.get(i, elementVar.dataType))
-if (indexVar.isDefined) {
-  indexVar.get.value.set(i)
-}
-result.update(i, f.eval(input))
-i += 1
+  override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any = 
{
+val arr = inputValue.asInstanceOf[ArrayData]
+val f = functionForEval
+val result = new GenericArrayData(new Array[Any](arr.numElements))
+var i = 0
+while (i < arr.numElements) {
+  elementVar.value.set(arr.get(i, elementVar.dataType))
+  if (indexVar.isDefined) {
+indexVar.get.value.set(i)
   }
-  result
+  result.update(i, f.eval(inputRow))
+  i += 1
 }
+result
   }
 
   override def prettyName: String = "transform"
 }
 
+/**
+ * Filters entries in a map using the provided function.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Filters entries in a map using the 
function.",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v);
+   [1 -> 0, 3 -> -1]
+  """,
+since = "2.4.0")
+case class MapFilter(
+input: Expression,
+function: Expression)
+  extends MapBasedUnaryHigherOrderFunction with CodegenFallback {
+
+  @transient val (keyType, valueType, valueContainsNull) = input.dataType 
match {
--- End diff --

Maybe this should be a function in object MapBasedUnaryHigherOrderFunction, 
we can use it in other map based higher order function just like using 
ArrayBasedHigherOrderFunction.elementArgumentType.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21985: [SPARK-24884][SQL] add regexp_extract_all support

2018-08-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21985#discussion_r207712639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 ---
@@ -446,3 +448,88 @@ case class RegExpExtract(subject: Expression, regexp: 
Expression, idx: Expressio
 })
   }
 }
+
+/**
+ * Extract all specific(idx) groups identified by a Java regex.
+ *
+ * NOTE: this expression is not THREAD-SAFE, as it has some internal 
mutable status.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(str, regexp[, idx]) - Extracts all groups that matches 
`regexp`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('100-200,300-400', '(\\d+)-(\\d+)', 1);
+   [100, 300]
+  """)
+case class RegExpExtractAll(subject: Expression, regexp: Expression, idx: 
Expression)
--- End diff --

Add an abstract class to reduce duplicated code between `RegExpExtractAll` 
and `RegExpExtract`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21985: [SPARK-24884][SQL] add regexp_extract_all support

2018-08-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21985#discussion_r207712323
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 ---
@@ -446,3 +448,88 @@ case class RegExpExtract(subject: Expression, regexp: 
Expression, idx: Expressio
 })
   }
 }
+
+/**
+ * Extract all specific(idx) groups identified by a Java regex.
+ *
+ * NOTE: this expression is not THREAD-SAFE, as it has some internal 
mutable status.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(str, regexp[, idx]) - Extracts all groups that matches 
`regexp`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('100-200,300-400', '(\\d+)-(\\d+)', 1);
+   [100, 300]
+  """)
+case class RegExpExtractAll(subject: Expression, regexp: Expression, idx: 
Expression)
+  extends TernaryExpression with ImplicitCastInputTypes {
+  def this(s: Expression, r: Expression) = this(s, r, Literal(1))
+
+  // last regex in string, we will update the pattern iff regexp value 
changed.
+  @transient private var lastRegex: UTF8String = _
+  // last regex pattern, we cache it for performance concern
+  @transient private var pattern: Pattern = _
+
+  override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
+if (!p.equals(lastRegex)) {
+  // regex value changed
+  lastRegex = p.asInstanceOf[UTF8String].clone()
+  pattern = Pattern.compile(lastRegex.toString)
+}
+val m = pattern.matcher(s.toString)
+var groupArrayBuffer = new ArrayBuffer[UTF8String]();
+
+while (m.find) {
+  val mr: MatchResult = m.toMatchResult
+  val group = mr.group(r.asInstanceOf[Int])
+  if (group == null) { // Pattern matched, but not optional group
+groupArrayBuffer += UTF8String.EMPTY_UTF8
+  } else {
+groupArrayBuffer += UTF8String.fromString(group)
+  }
+}
+
+new GenericArrayData(groupArrayBuffer.toArray.asInstanceOf[Array[Any]])
+  }
+
+  override def dataType: DataType = ArrayType(StringType)
+  override def inputTypes: Seq[AbstractDataType] = Seq(StringType, 
StringType, IntegerType)
+  override def children: Seq[Expression] = subject :: regexp :: idx :: Nil
+  override def prettyName: String = "regexp_extract_all"
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+val classNamePattern = classOf[Pattern].getCanonicalName
+val matcher = ctx.freshName("matcher")
+val matchResult = ctx.freshName("matchResult")
+val groupArray = ctx.freshName("groupArray")
+
+val termLastRegex = ctx.addMutableState("UTF8String", "lastRegex")
+val termPattern = ctx.addMutableState(classNamePattern, "pattern")
+
+val arrayClass = classOf[GenericArrayData].getName
+
+nullSafeCodeGen(ctx, ev, (subject, regexp, idx) => {
+  s"""
+  if (!$regexp.equals($termLastRegex)) {
+// regex value changed
+$termLastRegex = $regexp.clone();
+$termPattern = 
$classNamePattern.compile($termLastRegex.toString());
+  }
+  java.util.regex.Matcher $matcher =
+$termPattern.matcher($subject.toString());
+  java.util.ArrayList $groupArray =
+new java.util.ArrayList();
+
+  while ($matcher.find()) {
+java.util.regex.MatchResult $matchResult = 
$matcher.toMatchResult();
+if ($matchResult.group($idx) == null) {
+  $groupArray.add(UTF8String.EMPTY_UTF8);
+} else {
+  $groupArray.add(UTF8String.fromString($matchResult.group($idx)));
+}
+  }
+  ${ev.value} = new $arrayClass($groupArray.toArray(new 
UTF8String[$groupArray.size()]));
--- End diff --

Do we need consider about setting ev.isNull?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21945: [SPARK-24989][Core] Add retrying support for OutOfDirect...

2018-08-03 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21945
  
Close this, the param `spark.reducer.maxBlocksInFlightPerAddress` added 
after version 2.2 can solve my problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21945: [SPARK-24989][Core] Add retrying support for OutO...

2018-08-03 Thread xuanyuanking
Github user xuanyuanking closed the pull request at:

https://github.com/apache/spark/pull/21945


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21945: [SPARK-24989][Core] Add retrying support for OutOfDirect...

2018-08-01 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21945
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21945: [SPARK-24989][Core] Add retrying support for OutO...

2018-08-01 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/21945

[SPARK-24989][Core] Add retrying support for OutOfDirectMemoryError

## What changes were proposed in this pull request?

As the detailed description in 
[SPARK-24989](https://issues.apache.org/jira/browse/SPARK-24989), add retrying 
support in RetryingBlockFetcher while get io.netty.maxDirectMemory. The failed 
stages detail attached below:

![image](https://user-images.githubusercontent.com/4833765/43534362-c3a934a4-95e9-11e8-9ec1-5f868e04bc07.png)


## How was this patch tested?

Add UT in RetryingBlockFetcherSuite.java and test in the job above 
mentioned.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-24989

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21945.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21945


commit bb6841b3a7a160e252fe35dab82f4ddeb0032591
Author: Yuanjian Li 
Date:   2018-08-01T16:15:09Z

[SPARK-24989][Core] Add retry support for OutOfDirectMemoryError




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21881: [SPARK-24930][SQL] Improve exception information ...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21881#discussion_r206203835
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -337,7 +337,11 @@ case class LoadDataCommand(
   new File(file.getAbsolutePath).exists()
 }
 if (!exists) {
-  throw new AnalysisException(s"LOAD DATA input path does not 
exist: $path")
+  // If user have no permission to access the given input path, 
`File.exists()` return false
+  // , `LOAD DATA input path does not exist` can confuse users.
+  throw new AnalysisException(s"LOAD DATA input path does not 
exist: `$path` or current " +
--- End diff --

Nit: no need to print the $path twice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206184350
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVR

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206190334
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
--- End diff --

Do not have to start a new line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206188190
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVR

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206182473
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVR

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206188295
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVR

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206188012
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVR

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206184183
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVR

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206188650
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+
+  private case class PartitionDefinition(
+  column: String,
+  value: String,
+  location: URI,
+  format: Option[String] = None
+  ) {
+
+def toSpec: String = {
+  s"($column='$value')"
+}
+def toSpecAsMap: Map[String, String] = {
+  Map(column -> value)
+}
+  }
+
+  test("create hive table with multi format partitions") {
+val catalog = spark.sessionState.catalog
+withTempDir { baseDir =>
+
+  val partitionedTable = "ext_multiformat_partition_table"
+  withTable(partitionedTable) {
+assert(baseDir.listFiles.isEmpty)
+
+val partitions = createMultiformatPartitionDefinitions(baseDir)
+
+createTableWithPartitions(partitionedTable, baseDir, partitions)
+
+// Check table storage type is PARQUET
+val hiveResultTable =
+  catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
+assert(DDLUtils.isHiveTable(hiveResultTable))
+assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
+assert(hiveResultTable.storage.inputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
+)
+assert(hiveResultTable.storage.outputFormat
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+)
+assert(hiveResultTable.storage.serde
+  
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check table has correct partititons
+assert(
+  catalog.listPartitions(TableIdentifier(partitionedTable,
+Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
+)
+
+// Check first table partition storage type is PARQUET
+val parquetPartition = catalog.getPartition(
+  TableIdentifier(partitionedTable, Some("default")),
+  partitions.head.toSpecAsMap
+)
+assert(
+  parquetPartition.storage.serde
+
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+)
+
+// Check second table partition storage type is AVR

[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...

2018-07-30 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r206178526
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala
 ---
@@ -96,6 +96,9 @@ object ParserUtils {
 }
   }
 
+  def extraMethod(s: String): String = {
--- End diff --

what's this used for?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...

2018-07-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r205945617
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -857,6 +857,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
 
+  /**
+   * Create an [[AlterTableFormatPropertiesCommand]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT format;
+   * }}}
+   */
+  override def visitSetTableFormat(ctx: SetTableFormatContext): 
LogicalPlan = withOrigin(ctx) {
+val format = (ctx.fileFormat) match {
+  // Expected format: INPUTFORMAT input_format OUTPUTFORMAT 
output_format
+  case (c: TableFileFormatContext) =>
+visitTableFileFormat(c)
+  // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET 
| AVRO
+  case (c: GenericFileFormatContext) =>
+visitGenericFileFormat(c)
+  case _ =>
+throw new ParseException("Expected STORED AS ", ctx)
+}
+AlterTableFormatCommand(
+  visitTableIdentifier(ctx.tableIdentifier),
+  format,
+  // TODO a partition spec is allowed to have optional values. This is 
currently violated.
+  Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
--- End diff --

Confused by this todo, as currently implementation, while partition spec is 
empty, we change table's catalog?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...

2018-07-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r205945564
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.File
+import java.net.URI
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class MultiFormatTableSuite
+  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
+  import testImplicits._
+
+  val parser = new SparkSqlParser(new SQLConf())
+
+  override def afterEach(): Unit = {
+try {
+  // drop all databases, tables and functions after each test
+  spark.sessionState.catalog.reset()
+} finally {
+  super.afterEach()
+}
+  }
+
+  val partitionCol = "dt"
+  val partitionVal1 = "2018-01-26"
+  val partitionVal2 = "2018-01-27"
+  private case class PartitionDefinition(
+  column: String,
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...

2018-07-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r205945559
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -415,6 +415,51 @@ case class AlterTableSerDePropertiesCommand(
 
 }
 
+/**
+ * A command that sets the format of a table/view/partition .
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT format;
+ * }}}
+ */
+case class AlterTableFormatCommand(
+tableName: TableIdentifier,
--- End diff --

indent nit: 4 space with func param.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...

2018-07-28 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21893#discussion_r205945523
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -857,6 +857,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
 
+  /**
+   * Create an [[AlterTableFormatPropertiesCommand]] command.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table [PARTITION spec] SET FILEFORMAT format;
+   * }}}
+   */
+  override def visitSetTableFormat(ctx: SetTableFormatContext): 
LogicalPlan = withOrigin(ctx) {
+val format = (ctx.fileFormat) match {
+  // Expected format: INPUTFORMAT input_format OUTPUTFORMAT 
output_format
+  case (c: TableFileFormatContext) =>
+visitTableFileFormat(c)
+  // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET 
| AVRO
+  case (c: GenericFileFormatContext) =>
+visitGenericFileFormat(c)
+  case _ =>
+throw new ParseException("Expected STORED AS ", ctx)
--- End diff --

I think we need a more detailed ParseException message here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-07-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r205478496
  
--- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java ---
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * This is based on hadoop-common-2.7.2
+ * {@link org.apache.hadoop.fs.Globber}.
+ * This class exposes globWithThreshold which can be used glob path in 
parallel.
+ */
+public class SparkGlobber {
+  public static final Log LOG = 
LogFactory.getLog(SparkGlobber.class.getName());
+
+  private final FileSystem fs;
+  private final FileContext fc;
+  private final Path pathPattern;
+
+  public SparkGlobber(FileSystem fs, Path pathPattern) {
+this.fs = fs;
+this.fc = null;
+this.pathPattern = pathPattern;
+  }
+
+  public SparkGlobber(FileContext fc, Path pathPattern) {
+this.fs = null;
+this.fc = fc;
+this.pathPattern = pathPattern;
+  }
+
+  private FileStatus getFileStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.getFileStatus(path);
+  } else {
+return fc.getFileStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return null;
+}
+  }
+
+  private FileStatus[] listStatus(Path path) throws IOException {
+try {
+  if (fs != null) {
+return fs.listStatus(path);
+  } else {
+return fc.util().listStatus(path);
+  }
+} catch (FileNotFoundException e) {
+  return new FileStatus[0];
+}
+  }
+
+  private Path fixRelativePart(Path path) {
+if (fs != null) {
+  return fs.fixRelativePart(path);
+} else {
+  return fc.fixRelativePart(path);
+}
+  }
+
+  /**
+   * Convert a path component that contains backslash ecape sequences to a
+   * literal string.  This is necessary when you want to explicitly refer 
to a
+   * path that contains globber metacharacters.
+   */
+  private static String unescapePathComponent(String name) {
+return name.replaceAll("(.)", "$1");
+  }
+
+  /**
+   * Translate an absolute path into a list of path components.
+   * We merge double slashes into a single slash here.
+   * POSIX root path, i.e. '/', does not get an entry in the list.
+   */
+  private static List getPathComponents(String path)
+  throws IOException {
+ArrayList ret = new ArrayList();
+for (String component : path.split(Path.SEPARATOR)) {
+  if (!component.isEmpty()) {
+ret.add(component);
+  }
+}
+return ret;
+  }
+
+  private String schemeFromPath(Path path) throws IOException {
+String scheme = path.toUri().getScheme();
+if (scheme == null) {
+  if (fs != null) {
+scheme = fs.getUri().getScheme();
+  } else {
+scheme = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme();
+  }
+}
+return scheme;
+  }
+
+  private String authorityFromPath(Path path) throws IOException {
+String authority = path.toUri().getAuthority();
+if (authority == null) {
+  if (fs != null) {
+authority = fs.getUri().getAuthority();
+  } else {
+authority = 
fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority();
+  }
+}
+return authority ;
+  }
+
+  public FileStatus[] globWithThreshold(int threshold) throws IOException {
+ 

[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...

2018-07-24 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/19773#discussion_r204805474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand(
 
 // Find the origin column from dataSchema by column name.
 val originColumn = findColumnByName(table.dataSchema, columnName, 
resolver)
-// Throw an AnalysisException if the column name/dataType is changed.
+// Throw an AnalysisException if the column name is changed.
 if (!columnEqual(originColumn, newColumn, resolver)) {
   throw new AnalysisException(
 "ALTER TABLE CHANGE COLUMN is not supported for changing column " +
   s"'${originColumn.name}' with type '${originColumn.dataType}' to 
" +
   s"'${newColumn.name}' with type '${newColumn.dataType}'")
 }
 
+val typeChanged = originColumn.dataType != newColumn.dataType
+val partitionColumnChanged = 
table.partitionColumnNames.contains(originColumn.name)
+
+// Throw an AnalysisException if the type of partition column is 
changed.
+if (typeChanged && partitionColumnChanged) {
--- End diff --

Just adding a check here when user changing the type of partition columns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-07-24 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
@gatorsmile @maropu Please have a look about this, solving the conflicts 
takes me some time.
Also cc @jiangxb1987 because the conflict mainly with #20696, also thanks 
for the work in #20696, the latest pr no longer need to do the extra work for 
partition column comment changing as before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21839
  
Thanks for reviewing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19773
  
I'll resolve the conflicts today, thanks for ping me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19745: [SPARK-2926][Core][Follow Up] Sort shuffle reader...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking closed the pull request at:

https://github.com/apache/spark/pull/19745


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19745: [SPARK-2926][Core][Follow Up] Sort shuffle reader for Sp...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/19745
  
No problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21839
  
@gatorsmile Thanks for your advice, added ut in ScriptTransformationSuite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21839: [SPARK-24339][SQL] Prunes the unused columns from...

2018-07-23 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21839#discussion_r204447671
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -450,13 +450,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
 case d @ DeserializeToObject(_, _, child) if (child.outputSet -- 
d.references).nonEmpty =>
   d.copy(child = prunedChild(child, d.references))
 
-// Prunes the unused columns from child of Aggregate/Expand/Generate
+// Prunes the unused columns from child of 
Aggregate/Expand/Generate/ScriptTransformation
 case a @ Aggregate(_, _, child) if (child.outputSet -- 
a.references).nonEmpty =>
   a.copy(child = prunedChild(child, a.references))
 case f @ FlatMapGroupsInPandas(_, _, _, child) if (child.outputSet -- 
f.references).nonEmpty =>
   f.copy(child = prunedChild(child, f.references))
 case e @ Expand(_, _, child) if (child.outputSet -- 
e.references).nonEmpty =>
   e.copy(child = prunedChild(child, e.references))
+case s @ ScriptTransformation(_, _, _, child, _)
+  if (child.outputSet -- s.references).nonEmpty =>
--- End diff --

Thanks, fix in 2cf131f.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...

2018-07-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21839
  
@gatorsmile @maropu This is the follow up PR for #21447, please have a look 
when you have time, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21839: [SPARK-24339][SQL] Prunes the unused columns from...

2018-07-22 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/21839

[SPARK-24339][SQL] Prunes the unused columns from child of 
ScriptTransformation

## What changes were proposed in this pull request?

Modify the strategy in ColumnPruning to add a Project between 
ScriptTransformation and its child, this strategy can reduce the scan time 
especially in the scenario of the table has many columns.

## How was this patch tested?

Add UT in ColumnPruningSuite.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-24339

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21839.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21839


commit 68869d9fb8cc0e2686fb1e01f4d4c3e7ac8a52fe
Author: Yuanjian Li 
Date:   2018-07-22T14:46:31Z

Prunes the unused columns from child of ScriptTransformation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21447: [SPARK-24339][SQL]Add project for transform/map/reduce s...

2018-07-22 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21447
  
I want to give a follow up PR and cc @gatorsmile @maropu for a review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21533: [SPARK-24195][Core] Ignore the files with "local" scheme...

2018-07-19 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21533
  
Thanks everyone for your help!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21533: [SPARK-24195][Core] Ignore the files with "local" scheme...

2018-07-18 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21533
  
@jiangxb1987 Thanks for reminding, rephrase done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21789: [SPARK-24829][SQL]CAST AS FLOAT inconsistent with...

2018-07-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21789#discussion_r203037295
  
--- Diff: 
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 ---
@@ -766,6 +774,14 @@ class HiveThriftHttpServerSuite extends 
HiveThriftJdbcTest {
   assert(resultSet.getString(2) === HiveUtils.builtinHiveVersion)
 }
   }
+
+  test("Checks cast as float") {
--- End diff --

Duplicated code?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202701770
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2248,4 +2249,20 @@ class HiveDDLSuite
   checkAnswer(spark.table("t4"), Row(0, 0))
 }
   }
+
+  test("desc formatted table for last access verification") {
+withTable("t1") {
+  sql(s"create table" +
+s" if not exists t1 (c1_int int, c2_string string, c3_float 
float)")
+  val desc = sql("DESC FORMATTED t1").collect().toSeq
+  val lastAcessField = desc.filter((r: Row) => 
r.getValuesMap(Seq("col_name"))
+.get("col_name").getOrElse("").equals("Last Access"))
+  // Check whether lastAcessField key is exist
+  assert(!lastAcessField.isEmpty)
--- End diff --

lastAccessField.nonEmpty


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202703129
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -114,7 +114,10 @@ case class CatalogTablePartition(
   map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" 
+ p._2).mkString(", ")}}")
 }
 map.put("Created Time", new Date(createTime).toString)
-map.put("Last Access", new Date(lastAccessTime).toString)
+val lastAccess = {
+  if (-1 == lastAccessTime) "UNKNOWN" else new 
Date(lastAccessTime).toString
+}
+map.put("Last Access", lastAccess)
--- End diff --

No need for the val lastAccess?
```
map.put("Last Access",
  if (-1 == lastAccessTime) "UNKNOWN" else new 
Date(lastAccessTime).toString)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202704259
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2248,4 +2249,20 @@ class HiveDDLSuite
   checkAnswer(spark.table("t4"), Row(0, 0))
 }
   }
+
+  test("desc formatted table for last access verification") {
+withTable("t1") {
+  sql(s"create table" +
+s" if not exists t1 (c1_int int, c2_string string, c3_float 
float)")
+  val desc = sql("DESC FORMATTED t1").collect().toSeq
+  val lastAcessField = desc.filter((r: Row) => 
r.getValuesMap(Seq("col_name"))
+.get("col_name").getOrElse("").equals("Last Access"))
+  // Check whether lastAcessField key is exist
+  assert(!lastAcessField.isEmpty)
+  val validLastAcessFieldValue = lastAcessField.filterNot((r: Row) => 
((r
+.getValuesMap(Seq("data_type"))
+.get("data_type").contains(new Date(-1).toString
+  assert(lastAcessField.size!=0)
--- End diff --

code style nit: blank before and after '!='


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202703948
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2248,4 +2249,20 @@ class HiveDDLSuite
   checkAnswer(spark.table("t4"), Row(0, 0))
 }
   }
+
+  test("desc formatted table for last access verification") {
+withTable("t1") {
+  sql(s"create table" +
+s" if not exists t1 (c1_int int, c2_string string, c3_float 
float)")
+  val desc = sql("DESC FORMATTED t1").collect().toSeq
+  val lastAcessField = desc.filter((r: Row) => 
r.getValuesMap(Seq("col_name"))
+.get("col_name").getOrElse("").equals("Last Access"))
+  // Check whether lastAcessField key is exist
+  assert(!lastAcessField.isEmpty)
+  val validLastAcessFieldValue = lastAcessField.filterNot((r: Row) => 
((r
--- End diff --

where is the val `validLastAcessFieldValue` used? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...

2018-07-16 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21775#discussion_r202701870
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2248,4 +2249,20 @@ class HiveDDLSuite
   checkAnswer(spark.table("t4"), Row(0, 0))
 }
   }
+
+  test("desc formatted table for last access verification") {
+withTable("t1") {
+  sql(s"create table" +
+s" if not exists t1 (c1_int int, c2_string string, c3_float 
float)")
+  val desc = sql("DESC FORMATTED t1").collect().toSeq
+  val lastAcessField = desc.filter((r: Row) => 
r.getValuesMap(Seq("col_name"))
--- End diff --

nit: lastAccessField


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...

2018-07-12 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21618
  
gental ping @cloud-fan @gatorsmile @kiszk 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21729: SPARK-24755 Executor loss can cause task to not be resub...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on the issue:

https://github.com/apache/spark/pull/21729
  
Please change the title to '[SPARK-24755][Core] Executor loss can cause 
task to not be resubmitted'


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r201007556
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -656,6 +656,25 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PARALLEL_GET_GLOBBED_PATH_THRESHOLD =
+buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold")
+  .doc("The maximum number of subfiles or directories allowed after a 
globbed path " +
+"expansion. If the number of paths exceeds this value during 
expansion, it tries to " +
+"expand the globbed in parallel with multi-thread.")
+  .intConf
+  .checkValue(threshlod => threshlod >= 0, "The maximum number of 
subfiles or directories " +
+"must not be negative")
+  .createWithDefault(32)
+
+  val PARALLEL_GET_GLOBBED_PATH_NUM_THREADS =
+buildConf("spark.sql.sources.parallelGetGlobbedPath.numThreads")
+  .doc("The number of threads to get a collection of path in parallel. 
Set the " +
+"number to avoid generating too many threads.")
+  .intConf
+  .checkValue(parallel => parallel >= 0, "The maximum number of 
threads allowed for getting " +
--- End diff --

Thanks for your catch, while this value set to 0 we'll get a 
IllegalArgumentException during new ThreadPoolExecutor. So I use the 0 value 
here as the default value for controlling this feature as we discuss in 
https://github.com/apache/spark/pull/21618#discussion_r200465855


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r201006447
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -656,6 +656,25 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PARALLEL_GET_GLOBBED_PATH_THRESHOLD =
+buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold")
+  .doc("The maximum number of subfiles or directories allowed after a 
globbed path " +
+"expansion. If the number of paths exceeds this value during 
expansion, it tries to " +
+"expand the globbed in parallel with multi-thread.")
+  .intConf
+  .checkValue(threshlod => threshlod >= 0, "The maximum number of 
subfiles or directories " +
--- End diff --

Thanks, done in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21618#discussion_r201006275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -724,4 +726,35 @@ object DataSource extends Logging {
  """.stripMargin)
 }
   }
+
+  /**
+   * Return all paths represented by the wildcard string.
+   * Use a local thread pool to do this while there's too many paths.
+   */
+  private def getGlobbedPaths(
+  sparkSession: SparkSession,
+  fs: FileSystem,
+  hadoopConf: Configuration,
+  qualified: Path): Seq[Path] = {
+val getGlobbedPathThreshold = 
sparkSession.sessionState.conf.parallelGetGlobbedPathThreshold
+val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified, 
getGlobbedPathThreshold)
--- End diff --

Thanks for your advise, I'll reuse the value of 
`spark.sql.sources.parallelGetGlobbedPath.numThreads` to control this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21729#discussion_r200990424
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite 
with LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
+val conf = new SparkConf().set("spark.speculation", "true")
+sc = new SparkContext("local", "test", conf)
+// Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation.quantile", "0.5")
+sc.conf.set("spark.speculation", "true")
+
+var killTaskCalled = false
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+  ("exec2", "host2"), ("exec3", "host3"))
+sched.initialize(new FakeSchedulerBackend() {
+  override def killTask(taskId: Long,
+executorId: String,
+interruptThread: Boolean,
+reason: String): Unit = {
+// Check the only one killTask event in this case, which triggered 
by
+// task 2.1 completed.
+assert(taskId === 2)
+assert(executorId === "exec3")
+assert(interruptThread)
+assert(reason === "another attempt succeeded")
+killTaskCalled = true
+  }
+})
+
+// Keep track of the index of tasks that are resubmitted,
+// so that the test can check that task is resubmitted correctly
+var resubmittedTasks = new mutable.HashSet[Int]
+val dagScheduler = new FakeDAGScheduler(sc, sched) {
+  override def taskEnded(task: Task[_],
+ reason: TaskEndReason,
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21729#discussion_r200990279
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite 
with LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
+val conf = new SparkConf().set("spark.speculation", "true")
+sc = new SparkContext("local", "test", conf)
+// Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation.quantile", "0.5")
+sc.conf.set("spark.speculation", "true")
+
+var killTaskCalled = false
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+  ("exec2", "host2"), ("exec3", "host3"))
+sched.initialize(new FakeSchedulerBackend() {
+  override def killTask(taskId: Long,
+executorId: String,
--- End diff --

nit: indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...

2018-07-09 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21729#discussion_r200989413
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -87,7 +87,7 @@ private[spark] class TaskSetManager(
   // Set the coresponding index of Boolean var when the task killed by 
other attempt tasks,
--- End diff --

typo I made before, coresponding -> corresponding.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...

2018-07-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21642#discussion_r200160518
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -73,6 +74,10 @@ private[spark] class AppStatusListener(
   // around liveExecutors.
   @volatile private var activeExecutorCount = 0
 
+  private val inputDataSetId = new AtomicLong(0)
+  private val outputDataSetId = new AtomicLong(0)
+  private val maxRecords = conf.getInt("spark.data.maxRecords", 1000)
--- End diff --

What's this `spark.data.maxRecords` for? Maybe you should follow the config 
in core/src/main/scala/org/apache/spark/status/config.scala


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...

2018-07-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21642#discussion_r200159852
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) 
extends SparkListenerEvent
 @DeveloperApi
 case class SparkListenerLogStart(sparkVersion: String) extends 
SparkListenerEvent
 
+/**
+ * An internal class that describes the input data of an event log.
+ */
+@DeveloperApi
+case class SparkListenerInputUpdate(format: String,
+options: Map[String, String],
--- End diff --

indent. 
see:https://github.com/apache/spark/pull/21642/files#diff-fbe8f967070627c8dc155237e77c7314R172


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...

2018-07-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21642#discussion_r200160022
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -19,6 +19,7 @@ package org.apache.spark.status
 
 import java.util.Date
 import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
--- End diff --

import order error here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...

2018-07-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21642#discussion_r200159949
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) 
extends SparkListenerEvent
 @DeveloperApi
 case class SparkListenerLogStart(sparkVersion: String) extends 
SparkListenerEvent
 
+/**
+ * An internal class that describes the input data of an event log.
+ */
+@DeveloperApi
+case class SparkListenerInputUpdate(format: String,
+options: Map[String, String],
+locations: Seq[String] = 
Seq.empty[String])
+  extends SparkListenerEvent
+
+/**
+ * An internal class that describes the non-table output of an event log.
+ */
+@DeveloperApi
+case class SparkListenerOutputUpdate(format: String,
+ mode: String,
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   >