[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/1#discussion_r214508181 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl( accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) -val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { +val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) -taskIdToTaskSetManager.get(id).map { taskSetMgr => +Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => --- End diff -- Just leave a small concern here, original code locked hole scope of ids in `accumUpdates`, after this changing, maybe some id could be found originally but can't find now, because `taskIdToTaskSetManager` can be changed by `removeExecutor` or `statusUpdate`. Its not big problem if executor has been removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22205: [SPARK-25212][SQL] Support Filter in ConvertToLoc...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22205#discussion_r214505595 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1349,6 +1357,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) => LocalRelation(output, data.take(limit), isStreaming) + +case Filter(condition, LocalRelation(output, data, isStreaming)) --- End diff -- super nit: comment in https://github.com/apache/spark/pull/22205/files#diff-a636a87d8843eeccca90140be91d4fafR1348 not change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214084903 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray( return result; } - public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { -return fromPrimitiveArray(null, offset, length, elementSize); - } - - public static boolean shouldUseGenericArrayData(int elementSize, int length) { --- End diff -- Yep, the UT failed log proved this:https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95468/testReport/org.apache.spark.sql.catalyst.expressions/CollectionExpressionsSuite/Array_Union/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22282 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22282#discussion_r214075761 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java --- @@ -473,17 +474,6 @@ public static UnsafeArrayData fromPrimitiveArray( return result; } - public static UnsafeArrayData forPrimitiveArray(int offset, int length, int elementSize) { -return fromPrimitiveArray(null, offset, length, elementSize); - } - - public static boolean shouldUseGenericArrayData(int elementSize, int length) { --- End diff -- I think `shouldUseGenericArrayData` is still used in generated code, check the code here: https://github.com/apache/spark/blob/b459cf3f391d6e4ee9cb77a7b5ed510d027d9ddd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L3633 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/2 @cloud-fan @rdblue I want to leave some comments and thoughts during looking into this again, hope these can help us deciding the next step plan. Currently all the plan assumed input row is `RDD[InternalRow]`, whole framework treat columnar read as special case. Also the `inputRDDs` function not only be called in `WholeStageCodegenExec`, but also all the father physical node, it's very easy to get a mess in the scenario of nested plan during debug this fix. So we may have these 3 choices, the first two can totally remove cast but maybe have many changes on `CodegenSupport`, the last one can limited the changes but still has cast problem: 1. Erasure the type of `inputRDDs`, because we should allow both RDD[InternalRow] and RDD[ColumnarBatch] passed, mainly for the parent physical plan call the child. This is implemented as the last commit in this PR: https://github.com/apache/spark/pull/2/files 2. Refactor the framework to let all plan dealing with columnar batch 3. Limited the changes in `ColumnarBatchScan`, don't change `CodegenSupport`, but still left the cast problem. This is implemented as the first two commit in this PR: https://github.com/apache/spark/pull/2/files/7e88599dfc2caf177d12e890d588be68bdd3bc8e If all of these are not make sense, I'll just close this. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/2 Got it, I'll revert the changes in file source in this commit, thanks for your reply. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22252: [SPARK-25261][MINOR][DOC] correct the default uni...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22252#discussion_r213708177 --- Diff: docs/configuration.md --- @@ -152,7 +152,7 @@ of the most common options to set are: spark.driver.memory 1g -Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in MiB +Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in bytes --- End diff -- I think I got the point you want to report @ivoson, IIUC, this is a bug in the code not in doc, we should also make `spark.driver.memory=1024` with the unit of MiB, maybe this change the original behavior and we can announce in migrate guide? cc @srowen @HyukjinKwon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/2 @cloud-fan Thanks for your reply Wenchen, I'm trying to achieve this in this commit, please take a look, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22252: [SPARK-25261][MINOR][DOC] correct the default uni...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22252#discussion_r213343952 --- Diff: docs/configuration.md --- @@ -152,7 +152,7 @@ of the most common options to set are: spark.driver.memory 1g -Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in MiB +Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in bytes --- End diff -- Check the config code here. https://github.com/apache/spark/blob/99d2e4e00711cffbfaee8cb3da9b6b3feab8ff18/core/src/main/scala/org/apache/spark/internal/config/package.scala#L40-L43 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22149 ``` Is that possible to add a test case? ``` Thanks for your reply Xiao, we encountered some difficulties during the test case, cause this need mock on speculative behavior. We will keep looking into this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22149 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22024: [SPARK-25034][CORE] Remove allocations in onBlockFetchSu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22024 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22024: [SPARK-25034][CORE] Remove allocations in onBlock...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22024#discussion_r213015113 --- Diff: core/src/main/scala/org/apache/spark/network/BlockTransferService.scala --- @@ -101,15 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { - data match { -case f: FileSegmentManagedBuffer => - result.success(f) -case _ => - val ret = ByteBuffer.allocate(data.size.toInt) --- End diff -- The copy behavior was introduced by : https://github.com/apache/spark/pull/2330/commits/69f5d0a2434396abbbd98886e047bc08a9e65565. How can you make sure this can be replaced by increasing the reference count? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22024: [SPARK-25034][CORE] Remove allocations in onBlock...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22024#discussion_r213015245 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -160,7 +160,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) releaseLock(pieceId) case None => bm.getRemoteBytes(pieceId) match { -case Some(b) => +case Some(splitB) => + + // Checksum computation and further computations require the data + // from the ChunkedByteBuffer to be merged, so we we merge it now. --- End diff -- nit of the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212822215 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -191,6 +195,48 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) } + test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { +val (table1Name, table2Name) = ("t1", "t2") +withTempDatabase { dbName => + withTable(table1Name, table2Name) { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + spark.range(50).write.saveAsTable(s"$dbName.$table1Name") + spark.range(100).write.saveAsTable(s"$dbName.$table2Name") + // First, makes sure a join is not broadcastable + val plan = sql(s"SELECT * FROM $dbName.$table1Name, $dbName.$table2Name " + + s"WHERE $table1Name.id = $table2Name.id") +.queryExecution.executedPlan + assert(plan.collect { case p: BroadcastHashJoinExec => p }.size == 0) + + // Uses multi-part table names for broadcast hints + def checkIfHintApplied(tableName: String, hintTableName: String): Unit = { --- End diff -- `hintTableName` is never used in this func? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/2#discussion_r212820814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -307,7 +308,7 @@ case class FileSourceScanExec( withSelectedBucketsCount } - private lazy val inputRDD: RDD[InternalRow] = { + private lazy val inputRDD: RDD[Object] = { --- End diff -- Thanks Sean! Addressed in 7e88599. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22222: [SPARK-25083][SQL] Remove the type erasure hack in data ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/2 cc @cloud-fan and @rdblue have a look when you have time. If this PR doesn't coincide with your expect, I'll close this soon. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/2#discussion_r212784374 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala --- @@ -40,6 +42,29 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + /** + * Returns all the RDDs of ColumnarBatch which generates the input rows. + */ + def inputBatchRDDs(): Seq[RDD[ColumnarBatch]] + + /** + * Returns all the RDDs of InternalRow which generates the input rows. + */ + def inputRowRDDs(): Seq[RDD[InternalRow]] + + /** + * Get input RDD depends on supportsBatch. + */ + final def getInputRDDs(): Seq[RDD[InternalRow]] = { +if (supportsBatch) { + inputBatchRDDs().asInstanceOf[Seq[RDD[InternalRow]]] --- End diff -- Here maybe the last explicitly erasure hack left, please check whether is it acceptable or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22222: [SPARK-25083][SQL] Remove the type erasure hack i...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/2 [SPARK-25083][SQL] Remove the type erasure hack in data source scan ## What changes were proposed in this pull request? 1. Add function `inputBatchRDDs` and `inputRowRDDs` interface in `ColumnarBatchScan`. 2.rewrite them in physical node which extends `ColumnarBatchScan`. ## How was this patch tested? Refactor work, test with existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25083 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2 commit 992a08b1d77d59daeac95c67d07e5b8efe20ce20 Author: Yuanjian Li Date: 2018-08-24T15:54:27Z [SPARK-25083][SQL] Remove the type erasure hack in data source scan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22202: [SPARK-25211][Core] speculation and fetch failed ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22202#discussion_r212365264 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2246,58 +2247,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } - test("Trigger mapstage's job listener in submitMissingTasks") { --- End diff -- Could you give some explain for deleting this test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22149: [SPARK-25158][SQL]Executor accidentally exit because Scr...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22149 Gental ping @gatorsmile. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 @jiangxb1987 Great thanks for your comment! ``` One general idea is that we don't need to rely on the RPC framework to test ContextBarrierState, just mock RpcCallContexts should be enough. ``` Actually I also want to implement like this at first also as you asked in jira, but `ContextBarrierState` is the private inner class in `BarrierCoordinator`. Could I do the refactor of moving `ContextBarrierState` out of `BarrierCoordinator`? If that is permitted I think we can just mock RpcCallContext to reach this. ``` We shall cover the following scenarios: ``` Pretty cool for the list, the 5 in front scenarios are including in currently implement, I'll add the last checking work of `Make sure we clear all the internal data under each case.` after we reach an agreement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22177: stages in wrong order within job page DAG chart
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22177 Please change title to "[SPARK-25199][Web UI] XXX " as we described in http://spark.apache.org/contributing.html. ``` check the DAG chart in job page. ``` Could you also put the DAG chart screenshot after your fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22177: stages in wrong order within job page DAG chart
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22177#discussion_r212003441 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala --- @@ -337,7 +337,9 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP store.executorList(false), appStartTime) val operationGraphContent = store.asOption(store.operationGraphForJob(jobId)) match { - case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, operationGraph) + case Some(operationGraph) => UIUtils.showDagVizForJob(jobId, operationGraph.sortWith( + _.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "").toInt --- End diff -- Add `getStageId` function in `RDDOperationGraph` to do this will be better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22177: stages in wrong order within job page DAG chart
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22177#discussion_r212002571 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala --- @@ -18,18 +18,18 @@ package org.apache.spark.ui.jobs import java.util.Locale + import javax.servlet.http.HttpServletRequest import scala.collection.mutable.{Buffer, ListBuffer} import scala.xml.{Node, NodeSeq, Unparsed, Utility} - --- End diff -- revert this changes in import. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22180: [SPARK-25174][YARN]Limit the size of diagnostic m...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22180#discussion_r211996461 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -368,7 +369,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) -finalMsg = msg +finalMsg = if (msg == null || msg.length <= finalMsgLimitSize) { + msg +} else { + msg.substring(0, finalMsgLimitSize) --- End diff -- Maybe the message in last `finalMsgLimitSize` is more useful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22180: [SPARK-25174][YARN]Limit the size of diagnostic m...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22180#discussion_r211996874 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -143,6 +143,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends @volatile private var finished = false @volatile private var finalStatus = getDefaultFinalStatus @volatile private var finalMsg: String = "" + private val finalMsgLimitSize = sparkConf.get(AM_FINAL_MSG_LIMIT).toInt --- End diff -- nit: move this to L165? just for code clean. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22140 AFAIC, the fix should forbid illegal extra value passing. If less values than fields it should get a `AttributeError` while accessing as the currently implement, not ban it here? What do you think :) @HyukjinKwon @BryanCutler Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 My pleasure, just find this during glance over jira in recent days. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 cc @gatorsmile @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22165 cc @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22165 [SPARK-25017][Core] Add test suite for BarrierCoordinator and ContextBarrierState ## What changes were proposed in this pull request? Currently `ContextBarrierState` and `BarrierCoordinator` are only covered by end-to-end test in `BarrierTaskContextSuite`, add BarrierCoordinatorSuite to test both classes. ## How was this patch tested? UT in BarrierCoordinatorSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22165.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22165 commit 21bd1c37f4af6480adfc07130a15f70acdeda378 Author: liyuanjian Date: 2018-08-21T05:24:07Z [SPARK-25017][Core] Add test suite for BarrierCoordinator and ContextBarrierState --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22140 cc @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22140: [SPARK-25072][PySpark] Forbid extra value for cus...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22140 [SPARK-25072][PySpark] Forbid extra value for custom Row ## What changes were proposed in this pull request? Add value length check in `_create_row`, forbid extra value for custom Row in PySpark. ## How was this patch tested? New UT in pyspark-sql You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25072 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22140.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22140 commit b8c6522bccde51584e9878144924fd7b92f8785f Author: liyuanjian Date: 2018-08-18T08:36:53Z Forbidden extra value for custom Row --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22105: [SPARK-25115] [Core] Eliminate extra memory copy ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22105#discussion_r210842394 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java --- @@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOExcept // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance // for the case that the passed-in buffer has too many components. int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT); --- End diff -- ``` IIRC socket buffers are 32k by default on Linux, so it seems unlikely you'd be able to write 256k in one call (ignoring what IOUtil does internally). But maybe in practice it works ok. ``` After reading the context in #12083 and this discussion, I want to provide a possibility about 256k in one call can work in practice. As in our scenario, user will change `/proc/sys/net/core/wmem_default` based on their online behavior, generally we'll set this value larger than `wmem_default`. ![image](https://user-images.githubusercontent.com/4833765/44256457-cebc0980-a23b-11e8-9b70-c7ad66fcfe1c.png) So maybe 256k of NIO_BUFFER_LIMIT is ok here? We just need add more annotation to remind what params related with this value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22122 Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22122 ``` Are they all instances to fix? ``` @HyukjinKwon Yep, I grep all `conf.get("spark.sql.xxx")` and make sure for this. The remaining of hard code config is StaticSQLConf `spark.sql.catalogImplementation` in session.py, it can't manage by SQLConf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22122: [SPARK-24665][PySpark][FollowUp] Use SQLConf in P...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22122 [SPARK-24665][PySpark][FollowUp] Use SQLConf in PySpark to manage all sql configs ## What changes were proposed in this pull request? Follow up for SPARK-24665, find some others hard code during code review. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-24665-follow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22122.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22122 commit 8a32e60a7af4f574176366eb057b219cb4511bb6 Author: Yuanjian Li Date: 2018-07-02T07:04:40Z Use SQLConf in session.py and catalog.py --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22117: [SPARK-23654][BUILD] remove jets3t as a dependency of sp...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22117 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22093: [SPARK-25100][CORE] Fix no registering TaskCommitMessage...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22093 `Should I delete current UT from FileSuit?` I think current UT in `FileSuite` is unnecessarily, you can leave it and wait for other reviewer's opinion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22093: [SPARK-25100][CORE] Fix no registering TaskCommitMessage...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/22093 Why we should create own SparkContext here? Could we just add a UT like `registration of HighlyCompressedMapStatus` to check `TaskCommitMessage` working? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22093: [SPARK-25100][CORE] Fix no registering TaskCommit...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22093#discussion_r209650955 --- Diff: core/src/test/scala/org/apache/spark/FileSuite.scala --- @@ -424,6 +425,39 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { randomRDD.saveAsNewAPIHadoopDataset(jobConfig) assert(new File(tempDir.getPath + "/outputDataset_new/part-r-0").exists() === true) } + + test("SPARK-25100: Using KryoSerializer and" + + " setting registrationRequired true can lead job failed") { +val tempDir = Utils.createTempDir() +val inputDir = tempDir.getAbsolutePath + "/input" +val outputDir = tempDir.getAbsolutePath + "/tmp" + +val writer = new PrintWriter(new File(inputDir)) + +for(i <- 1 to 100) { + writer.print(i) + writer.write('\n') +} + +writer.close() + +val conf = new SparkConf(false).setMaster("local"). +set("spark.kryo.registrationRequired", "true").setAppName("test") +conf.set("spark.serializer", classOf[KryoSerializer].getName) +conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") --- End diff -- Why we need set 'spark.serializer' twice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22057: [SPARK-25077][SQL] Delete unused variable in Wind...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22057 [SPARK-25077][SQL] Delete unused variable in WindowExec ## What changes were proposed in this pull request? Just delete the unused variable `inputFields` in WindowExec, avoid making others confused while reading the code. ## How was this patch tested? Existing UT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25077 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22057.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22057 commit 90513587ed1d48437818e90f58612c344009f563 Author: liyuanjian Date: 2018-08-09T15:57:29Z [SPARK-25077][SQL] Delete unused variable in WindowExec --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208260664 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +364,101 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left) + + @transient lazy val MapType(_, rightValueType, _) = getMapType(right) + + @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable --- End diff -- `left.nullable && right.nullable`? Because if one side is empty map, NULL will be passed as the value for each key in other side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208257687 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala --- @@ -267,22 +267,23 @@ case class GetArrayItem(child: Expression, ordinal: Expression) } } -/** - * Common base class for [[GetMapValue]] and [[ElementAt]]. - */ - -abstract class GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes { +object GetMapValueUtil +{ --- End diff -- nit: brace should in previous line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21986#discussion_r207924294 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -205,29 +230,82 @@ case class ArrayTransform( (elementVar, indexVar) } - override def eval(input: InternalRow): Any = { -val arr = this.input.eval(input).asInstanceOf[ArrayData] -if (arr == null) { - null -} else { - val f = functionForEval - val result = new GenericArrayData(new Array[Any](arr.numElements)) - var i = 0 - while (i < arr.numElements) { -elementVar.value.set(arr.get(i, elementVar.dataType)) -if (indexVar.isDefined) { - indexVar.get.value.set(i) -} -result.update(i, f.eval(input)) -i += 1 + override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any = { +val arr = inputValue.asInstanceOf[ArrayData] +val f = functionForEval +val result = new GenericArrayData(new Array[Any](arr.numElements)) +var i = 0 +while (i < arr.numElements) { + elementVar.value.set(arr.get(i, elementVar.dataType)) + if (indexVar.isDefined) { +indexVar.get.value.set(i) } - result + result.update(i, f.eval(inputRow)) + i += 1 } +result } override def prettyName: String = "transform" } +/** + * Filters entries in a map using the provided function. + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Filters entries in a map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v); + [1 -> 0, 3 -> -1] + """, +since = "2.4.0") +case class MapFilter( +input: Expression, +function: Expression) + extends MapBasedUnaryHigherOrderFunction with CodegenFallback { + + @transient val (keyType, valueType, valueContainsNull) = input.dataType match { --- End diff -- Maybe this should be a function in object MapBasedUnaryHigherOrderFunction, we can use it in other map based higher order function just like using ArrayBasedHigherOrderFunction.elementArgumentType. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21985: [SPARK-24884][SQL] add regexp_extract_all support
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21985#discussion_r207712639 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala --- @@ -446,3 +448,88 @@ case class RegExpExtract(subject: Expression, regexp: Expression, idx: Expressio }) } } + +/** + * Extract all specific(idx) groups identified by a Java regex. + * + * NOTE: this expression is not THREAD-SAFE, as it has some internal mutable status. + */ +@ExpressionDescription( + usage = "_FUNC_(str, regexp[, idx]) - Extracts all groups that matches `regexp`.", + examples = """ +Examples: + > SELECT _FUNC_('100-200,300-400', '(\\d+)-(\\d+)', 1); + [100, 300] + """) +case class RegExpExtractAll(subject: Expression, regexp: Expression, idx: Expression) --- End diff -- Add an abstract class to reduce duplicated code between `RegExpExtractAll` and `RegExpExtract`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21985: [SPARK-24884][SQL] add regexp_extract_all support
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21985#discussion_r207712323 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala --- @@ -446,3 +448,88 @@ case class RegExpExtract(subject: Expression, regexp: Expression, idx: Expressio }) } } + +/** + * Extract all specific(idx) groups identified by a Java regex. + * + * NOTE: this expression is not THREAD-SAFE, as it has some internal mutable status. + */ +@ExpressionDescription( + usage = "_FUNC_(str, regexp[, idx]) - Extracts all groups that matches `regexp`.", + examples = """ +Examples: + > SELECT _FUNC_('100-200,300-400', '(\\d+)-(\\d+)', 1); + [100, 300] + """) +case class RegExpExtractAll(subject: Expression, regexp: Expression, idx: Expression) + extends TernaryExpression with ImplicitCastInputTypes { + def this(s: Expression, r: Expression) = this(s, r, Literal(1)) + + // last regex in string, we will update the pattern iff regexp value changed. + @transient private var lastRegex: UTF8String = _ + // last regex pattern, we cache it for performance concern + @transient private var pattern: Pattern = _ + + override def nullSafeEval(s: Any, p: Any, r: Any): Any = { +if (!p.equals(lastRegex)) { + // regex value changed + lastRegex = p.asInstanceOf[UTF8String].clone() + pattern = Pattern.compile(lastRegex.toString) +} +val m = pattern.matcher(s.toString) +var groupArrayBuffer = new ArrayBuffer[UTF8String](); + +while (m.find) { + val mr: MatchResult = m.toMatchResult + val group = mr.group(r.asInstanceOf[Int]) + if (group == null) { // Pattern matched, but not optional group +groupArrayBuffer += UTF8String.EMPTY_UTF8 + } else { +groupArrayBuffer += UTF8String.fromString(group) + } +} + +new GenericArrayData(groupArrayBuffer.toArray.asInstanceOf[Array[Any]]) + } + + override def dataType: DataType = ArrayType(StringType) + override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType, IntegerType) + override def children: Seq[Expression] = subject :: regexp :: idx :: Nil + override def prettyName: String = "regexp_extract_all" + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val classNamePattern = classOf[Pattern].getCanonicalName +val matcher = ctx.freshName("matcher") +val matchResult = ctx.freshName("matchResult") +val groupArray = ctx.freshName("groupArray") + +val termLastRegex = ctx.addMutableState("UTF8String", "lastRegex") +val termPattern = ctx.addMutableState(classNamePattern, "pattern") + +val arrayClass = classOf[GenericArrayData].getName + +nullSafeCodeGen(ctx, ev, (subject, regexp, idx) => { + s""" + if (!$regexp.equals($termLastRegex)) { +// regex value changed +$termLastRegex = $regexp.clone(); +$termPattern = $classNamePattern.compile($termLastRegex.toString()); + } + java.util.regex.Matcher $matcher = +$termPattern.matcher($subject.toString()); + java.util.ArrayList $groupArray = +new java.util.ArrayList(); + + while ($matcher.find()) { +java.util.regex.MatchResult $matchResult = $matcher.toMatchResult(); +if ($matchResult.group($idx) == null) { + $groupArray.add(UTF8String.EMPTY_UTF8); +} else { + $groupArray.add(UTF8String.fromString($matchResult.group($idx))); +} + } + ${ev.value} = new $arrayClass($groupArray.toArray(new UTF8String[$groupArray.size()])); --- End diff -- Do we need consider about setting ev.isNull? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21945: [SPARK-24989][Core] Add retrying support for OutOfDirect...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21945 Close this, the param `spark.reducer.maxBlocksInFlightPerAddress` added after version 2.2 can solve my problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21945: [SPARK-24989][Core] Add retrying support for OutO...
Github user xuanyuanking closed the pull request at: https://github.com/apache/spark/pull/21945 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21945: [SPARK-24989][Core] Add retrying support for OutOfDirect...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21945 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21945: [SPARK-24989][Core] Add retrying support for OutO...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/21945 [SPARK-24989][Core] Add retrying support for OutOfDirectMemoryError ## What changes were proposed in this pull request? As the detailed description in [SPARK-24989](https://issues.apache.org/jira/browse/SPARK-24989), add retrying support in RetryingBlockFetcher while get io.netty.maxDirectMemory. The failed stages detail attached below: ![image](https://user-images.githubusercontent.com/4833765/43534362-c3a934a4-95e9-11e8-9ec1-5f868e04bc07.png) ## How was this patch tested? Add UT in RetryingBlockFetcherSuite.java and test in the job above mentioned. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-24989 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21945.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21945 commit bb6841b3a7a160e252fe35dab82f4ddeb0032591 Author: Yuanjian Li Date: 2018-08-01T16:15:09Z [SPARK-24989][Core] Add retry support for OutOfDirectMemoryError --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21881: [SPARK-24930][SQL] Improve exception information ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21881#discussion_r206203835 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -337,7 +337,11 @@ case class LoadDataCommand( new File(file.getAbsolutePath).exists() } if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + // If user have no permission to access the given input path, `File.exists()` return false + // , `LOAD DATA input path does not exist` can confuse users. + throw new AnalysisException(s"LOAD DATA input path does not exist: `$path` or current " + --- End diff -- Nit: no need to print the $path twice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206184350 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + +def toSpec: String = { + s"($column='$value')" +} +def toSpecAsMap: Map[String, String] = { + Map(column -> value) +} + } + + test("create hive table with multi format partitions") { +val catalog = spark.sessionState.catalog +withTempDir { baseDir => + + val partitionedTable = "ext_multiformat_partition_table" + withTable(partitionedTable) { +assert(baseDir.listFiles.isEmpty) + +val partitions = createMultiformatPartitionDefinitions(baseDir) + +createTableWithPartitions(partitionedTable, baseDir, partitions) + +// Check table storage type is PARQUET +val hiveResultTable = + catalog.getTableMetadata(TableIdentifier(partitionedTable, Some("default"))) +assert(DDLUtils.isHiveTable(hiveResultTable)) +assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL) +assert(hiveResultTable.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") +) +assert(hiveResultTable.storage.outputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") +) +assert(hiveResultTable.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check table has correct partititons +assert( + catalog.listPartitions(TableIdentifier(partitionedTable, +Some("default"))).map(_.spec).toSet == partitions.map(_.toSpecAsMap).toSet +) + +// Check first table partition storage type is PARQUET +val parquetPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.head.toSpecAsMap +) +assert( + parquetPartition.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check second table partition storage type is AVR
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206190334 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { --- End diff -- Do not have to start a new line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206188190 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + +def toSpec: String = { + s"($column='$value')" +} +def toSpecAsMap: Map[String, String] = { + Map(column -> value) +} + } + + test("create hive table with multi format partitions") { +val catalog = spark.sessionState.catalog +withTempDir { baseDir => + + val partitionedTable = "ext_multiformat_partition_table" + withTable(partitionedTable) { +assert(baseDir.listFiles.isEmpty) + +val partitions = createMultiformatPartitionDefinitions(baseDir) + +createTableWithPartitions(partitionedTable, baseDir, partitions) + +// Check table storage type is PARQUET +val hiveResultTable = + catalog.getTableMetadata(TableIdentifier(partitionedTable, Some("default"))) +assert(DDLUtils.isHiveTable(hiveResultTable)) +assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL) +assert(hiveResultTable.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") +) +assert(hiveResultTable.storage.outputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") +) +assert(hiveResultTable.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check table has correct partititons +assert( + catalog.listPartitions(TableIdentifier(partitionedTable, +Some("default"))).map(_.spec).toSet == partitions.map(_.toSpecAsMap).toSet +) + +// Check first table partition storage type is PARQUET +val parquetPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.head.toSpecAsMap +) +assert( + parquetPartition.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check second table partition storage type is AVR
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206182473 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + +def toSpec: String = { + s"($column='$value')" +} +def toSpecAsMap: Map[String, String] = { + Map(column -> value) +} + } + + test("create hive table with multi format partitions") { +val catalog = spark.sessionState.catalog +withTempDir { baseDir => + + val partitionedTable = "ext_multiformat_partition_table" + withTable(partitionedTable) { +assert(baseDir.listFiles.isEmpty) + +val partitions = createMultiformatPartitionDefinitions(baseDir) + +createTableWithPartitions(partitionedTable, baseDir, partitions) + +// Check table storage type is PARQUET +val hiveResultTable = + catalog.getTableMetadata(TableIdentifier(partitionedTable, Some("default"))) +assert(DDLUtils.isHiveTable(hiveResultTable)) +assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL) +assert(hiveResultTable.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") +) +assert(hiveResultTable.storage.outputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") +) +assert(hiveResultTable.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check table has correct partititons +assert( + catalog.listPartitions(TableIdentifier(partitionedTable, +Some("default"))).map(_.spec).toSet == partitions.map(_.toSpecAsMap).toSet +) + +// Check first table partition storage type is PARQUET +val parquetPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.head.toSpecAsMap +) +assert( + parquetPartition.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check second table partition storage type is AVR
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206188295 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + +def toSpec: String = { + s"($column='$value')" +} +def toSpecAsMap: Map[String, String] = { + Map(column -> value) +} + } + + test("create hive table with multi format partitions") { +val catalog = spark.sessionState.catalog +withTempDir { baseDir => + + val partitionedTable = "ext_multiformat_partition_table" + withTable(partitionedTable) { +assert(baseDir.listFiles.isEmpty) + +val partitions = createMultiformatPartitionDefinitions(baseDir) + +createTableWithPartitions(partitionedTable, baseDir, partitions) + +// Check table storage type is PARQUET +val hiveResultTable = + catalog.getTableMetadata(TableIdentifier(partitionedTable, Some("default"))) +assert(DDLUtils.isHiveTable(hiveResultTable)) +assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL) +assert(hiveResultTable.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") +) +assert(hiveResultTable.storage.outputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") +) +assert(hiveResultTable.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check table has correct partititons +assert( + catalog.listPartitions(TableIdentifier(partitionedTable, +Some("default"))).map(_.spec).toSet == partitions.map(_.toSpecAsMap).toSet +) + +// Check first table partition storage type is PARQUET +val parquetPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.head.toSpecAsMap +) +assert( + parquetPartition.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check second table partition storage type is AVR
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206188012 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + +def toSpec: String = { + s"($column='$value')" +} +def toSpecAsMap: Map[String, String] = { + Map(column -> value) +} + } + + test("create hive table with multi format partitions") { +val catalog = spark.sessionState.catalog +withTempDir { baseDir => + + val partitionedTable = "ext_multiformat_partition_table" + withTable(partitionedTable) { +assert(baseDir.listFiles.isEmpty) + +val partitions = createMultiformatPartitionDefinitions(baseDir) + +createTableWithPartitions(partitionedTable, baseDir, partitions) + +// Check table storage type is PARQUET +val hiveResultTable = + catalog.getTableMetadata(TableIdentifier(partitionedTable, Some("default"))) +assert(DDLUtils.isHiveTable(hiveResultTable)) +assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL) +assert(hiveResultTable.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") +) +assert(hiveResultTable.storage.outputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") +) +assert(hiveResultTable.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check table has correct partititons +assert( + catalog.listPartitions(TableIdentifier(partitionedTable, +Some("default"))).map(_.spec).toSet == partitions.map(_.toSpecAsMap).toSet +) + +// Check first table partition storage type is PARQUET +val parquetPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.head.toSpecAsMap +) +assert( + parquetPartition.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check second table partition storage type is AVR
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206184183 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + +def toSpec: String = { + s"($column='$value')" +} +def toSpecAsMap: Map[String, String] = { + Map(column -> value) +} + } + + test("create hive table with multi format partitions") { +val catalog = spark.sessionState.catalog +withTempDir { baseDir => + + val partitionedTable = "ext_multiformat_partition_table" + withTable(partitionedTable) { +assert(baseDir.listFiles.isEmpty) + +val partitions = createMultiformatPartitionDefinitions(baseDir) + +createTableWithPartitions(partitionedTable, baseDir, partitions) + +// Check table storage type is PARQUET +val hiveResultTable = + catalog.getTableMetadata(TableIdentifier(partitionedTable, Some("default"))) +assert(DDLUtils.isHiveTable(hiveResultTable)) +assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL) +assert(hiveResultTable.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") +) +assert(hiveResultTable.storage.outputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") +) +assert(hiveResultTable.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check table has correct partititons +assert( + catalog.listPartitions(TableIdentifier(partitionedTable, +Some("default"))).map(_.spec).toSet == partitions.map(_.toSpecAsMap).toSet +) + +// Check first table partition storage type is PARQUET +val parquetPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.head.toSpecAsMap +) +assert( + parquetPartition.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check second table partition storage type is AVR
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206188650 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + + private case class PartitionDefinition( + column: String, + value: String, + location: URI, + format: Option[String] = None + ) { + +def toSpec: String = { + s"($column='$value')" +} +def toSpecAsMap: Map[String, String] = { + Map(column -> value) +} + } + + test("create hive table with multi format partitions") { +val catalog = spark.sessionState.catalog +withTempDir { baseDir => + + val partitionedTable = "ext_multiformat_partition_table" + withTable(partitionedTable) { +assert(baseDir.listFiles.isEmpty) + +val partitions = createMultiformatPartitionDefinitions(baseDir) + +createTableWithPartitions(partitionedTable, baseDir, partitions) + +// Check table storage type is PARQUET +val hiveResultTable = + catalog.getTableMetadata(TableIdentifier(partitionedTable, Some("default"))) +assert(DDLUtils.isHiveTable(hiveResultTable)) +assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL) +assert(hiveResultTable.storage.inputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") +) +assert(hiveResultTable.storage.outputFormat + .contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") +) +assert(hiveResultTable.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check table has correct partititons +assert( + catalog.listPartitions(TableIdentifier(partitionedTable, +Some("default"))).map(_.spec).toSet == partitions.map(_.toSpecAsMap).toSet +) + +// Check first table partition storage type is PARQUET +val parquetPartition = catalog.getPartition( + TableIdentifier(partitionedTable, Some("default")), + partitions.head.toSpecAsMap +) +assert( + parquetPartition.storage.serde + .contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") +) + +// Check second table partition storage type is AVR
[GitHub] spark pull request #21893: [SPARK-24965][SQL] Support selecting from partiti...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r206178526 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala --- @@ -96,6 +96,9 @@ object ParserUtils { } } + def extraMethod(s: String): String = { --- End diff -- what's this used for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r205945617 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -857,6 +857,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) } + /** + * Create an [[AlterTableFormatPropertiesCommand]] command. + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET FILEFORMAT format; + * }}} + */ + override def visitSetTableFormat(ctx: SetTableFormatContext): LogicalPlan = withOrigin(ctx) { +val format = (ctx.fileFormat) match { + // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format + case (c: TableFileFormatContext) => +visitTableFileFormat(c) + // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO + case (c: GenericFileFormatContext) => +visitGenericFileFormat(c) + case _ => +throw new ParseException("Expected STORED AS ", ctx) +} +AlterTableFormatCommand( + visitTableIdentifier(ctx.tableIdentifier), + format, + // TODO a partition spec is allowed to have optional values. This is currently violated. + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) --- End diff -- Confused by this todo, as currently implementation, while partition spec is empty, we change table's catalog? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r205945564 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala --- @@ -0,0 +1,512 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.net.URI + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Matchers + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class MultiFormatTableSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach with Matchers { + import testImplicits._ + + val parser = new SparkSqlParser(new SQLConf()) + + override def afterEach(): Unit = { +try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() +} finally { + super.afterEach() +} + } + + val partitionCol = "dt" + val partitionVal1 = "2018-01-26" + val partitionVal2 = "2018-01-27" + private case class PartitionDefinition( + column: String, --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r205945559 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -415,6 +415,51 @@ case class AlterTableSerDePropertiesCommand( } +/** + * A command that sets the format of a table/view/partition . + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table [PARTITION spec] SET FILEFORMAT format; + * }}} + */ +case class AlterTableFormatCommand( +tableName: TableIdentifier, --- End diff -- indent nit: 4 space with func param. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21893: Support selecting from partitioned tabels with pa...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21893#discussion_r205945523 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala --- @@ -857,6 +857,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) } + /** + * Create an [[AlterTableFormatPropertiesCommand]] command. + * + * For example: + * {{{ + * ALTER TABLE table [PARTITION spec] SET FILEFORMAT format; + * }}} + */ + override def visitSetTableFormat(ctx: SetTableFormatContext): LogicalPlan = withOrigin(ctx) { +val format = (ctx.fileFormat) match { + // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format + case (c: TableFileFormatContext) => +visitTableFileFormat(c) + // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO + case (c: GenericFileFormatContext) => +visitGenericFileFormat(c) + case _ => +throw new ParseException("Expected STORED AS ", ctx) --- End diff -- I think we need a more detailed ParseException message here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r205478496 --- Diff: core/src/main/java/org/apache/hadoop/fs/SparkGlobber.java --- @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * This is based on hadoop-common-2.7.2 + * {@link org.apache.hadoop.fs.Globber}. + * This class exposes globWithThreshold which can be used glob path in parallel. + */ +public class SparkGlobber { + public static final Log LOG = LogFactory.getLog(SparkGlobber.class.getName()); + + private final FileSystem fs; + private final FileContext fc; + private final Path pathPattern; + + public SparkGlobber(FileSystem fs, Path pathPattern) { +this.fs = fs; +this.fc = null; +this.pathPattern = pathPattern; + } + + public SparkGlobber(FileContext fc, Path pathPattern) { +this.fs = null; +this.fc = fc; +this.pathPattern = pathPattern; + } + + private FileStatus getFileStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.getFileStatus(path); + } else { +return fc.getFileStatus(path); + } +} catch (FileNotFoundException e) { + return null; +} + } + + private FileStatus[] listStatus(Path path) throws IOException { +try { + if (fs != null) { +return fs.listStatus(path); + } else { +return fc.util().listStatus(path); + } +} catch (FileNotFoundException e) { + return new FileStatus[0]; +} + } + + private Path fixRelativePart(Path path) { +if (fs != null) { + return fs.fixRelativePart(path); +} else { + return fc.fixRelativePart(path); +} + } + + /** + * Convert a path component that contains backslash ecape sequences to a + * literal string. This is necessary when you want to explicitly refer to a + * path that contains globber metacharacters. + */ + private static String unescapePathComponent(String name) { +return name.replaceAll("(.)", "$1"); + } + + /** + * Translate an absolute path into a list of path components. + * We merge double slashes into a single slash here. + * POSIX root path, i.e. '/', does not get an entry in the list. + */ + private static List getPathComponents(String path) + throws IOException { +ArrayList ret = new ArrayList(); +for (String component : path.split(Path.SEPARATOR)) { + if (!component.isEmpty()) { +ret.add(component); + } +} +return ret; + } + + private String schemeFromPath(Path path) throws IOException { +String scheme = path.toUri().getScheme(); +if (scheme == null) { + if (fs != null) { +scheme = fs.getUri().getScheme(); + } else { +scheme = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getScheme(); + } +} +return scheme; + } + + private String authorityFromPath(Path path) throws IOException { +String authority = path.toUri().getAuthority(); +if (authority == null) { + if (fs != null) { +authority = fs.getUri().getAuthority(); + } else { +authority = fc.getFSofPath(fc.fixRelativePart(path)).getUri().getAuthority(); + } +} +return authority ; + } + + public FileStatus[] globWithThreshold(int threshold) throws IOException { +
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r204805474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,18 +318,34 @@ case class AlterTableChangeColumnCommand( // Find the origin column from dataSchema by column name. val originColumn = findColumnByName(table.dataSchema, columnName, resolver) -// Throw an AnalysisException if the column name/dataType is changed. +// Throw an AnalysisException if the column name is changed. if (!columnEqual(originColumn, newColumn, resolver)) { throw new AnalysisException( "ALTER TABLE CHANGE COLUMN is not supported for changing column " + s"'${originColumn.name}' with type '${originColumn.dataType}' to " + s"'${newColumn.name}' with type '${newColumn.dataType}'") } +val typeChanged = originColumn.dataType != newColumn.dataType +val partitionColumnChanged = table.partitionColumnNames.contains(originColumn.name) + +// Throw an AnalysisException if the type of partition column is changed. +if (typeChanged && partitionColumnChanged) { --- End diff -- Just adding a check here when user changing the type of partition columns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 @gatorsmile @maropu Please have a look about this, solving the conflicts takes me some time. Also cc @jiangxb1987 because the conflict mainly with #20696, also thanks for the work in #20696, the latest pr no longer need to do the extra work for partition column comment changing as before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21839 Thanks for reviewing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 I'll resolve the conflicts today, thanks for ping me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19745: [SPARK-2926][Core][Follow Up] Sort shuffle reader...
Github user xuanyuanking closed the pull request at: https://github.com/apache/spark/pull/19745 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19745: [SPARK-2926][Core][Follow Up] Sort shuffle reader for Sp...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19745 No problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21839 @gatorsmile Thanks for your advice, added ut in ScriptTransformationSuite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21839: [SPARK-24339][SQL] Prunes the unused columns from...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21839#discussion_r204447671 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -450,13 +450,16 @@ object ColumnPruning extends Rule[LogicalPlan] { case d @ DeserializeToObject(_, _, child) if (child.outputSet -- d.references).nonEmpty => d.copy(child = prunedChild(child, d.references)) -// Prunes the unused columns from child of Aggregate/Expand/Generate +// Prunes the unused columns from child of Aggregate/Expand/Generate/ScriptTransformation case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => a.copy(child = prunedChild(child, a.references)) case f @ FlatMapGroupsInPandas(_, _, _, child) if (child.outputSet -- f.references).nonEmpty => f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) +case s @ ScriptTransformation(_, _, _, child, _) + if (child.outputSet -- s.references).nonEmpty => --- End diff -- Thanks, fix in 2cf131f. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21839: [SPARK-24339][SQL] Prunes the unused columns from child ...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21839 @gatorsmile @maropu This is the follow up PR for #21447, please have a look when you have time, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21839: [SPARK-24339][SQL] Prunes the unused columns from...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/21839 [SPARK-24339][SQL] Prunes the unused columns from child of ScriptTransformation ## What changes were proposed in this pull request? Modify the strategy in ColumnPruning to add a Project between ScriptTransformation and its child, this strategy can reduce the scan time especially in the scenario of the table has many columns. ## How was this patch tested? Add UT in ColumnPruningSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-24339 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21839.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21839 commit 68869d9fb8cc0e2686fb1e01f4d4c3e7ac8a52fe Author: Yuanjian Li Date: 2018-07-22T14:46:31Z Prunes the unused columns from child of ScriptTransformation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21447: [SPARK-24339][SQL]Add project for transform/map/reduce s...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21447 I want to give a follow up PR and cc @gatorsmile @maropu for a review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21533: [SPARK-24195][Core] Ignore the files with "local" scheme...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21533 Thanks everyone for your help! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21533: [SPARK-24195][Core] Ignore the files with "local" scheme...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21533 @jiangxb1987 Thanks for reminding, rephrase done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21789: [SPARK-24829][SQL]CAST AS FLOAT inconsistent with...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21789#discussion_r203037295 --- Diff: sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala --- @@ -766,6 +774,14 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { assert(resultSet.getString(2) === HiveUtils.builtinHiveVersion) } } + + test("Checks cast as float") { --- End diff -- Duplicated code? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202701770 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2248,4 +2249,20 @@ class HiveDDLSuite checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("desc formatted table for last access verification") { +withTable("t1") { + sql(s"create table" + +s" if not exists t1 (c1_int int, c2_string string, c3_float float)") + val desc = sql("DESC FORMATTED t1").collect().toSeq + val lastAcessField = desc.filter((r: Row) => r.getValuesMap(Seq("col_name")) +.get("col_name").getOrElse("").equals("Last Access")) + // Check whether lastAcessField key is exist + assert(!lastAcessField.isEmpty) --- End diff -- lastAccessField.nonEmpty --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202703129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -114,7 +114,10 @@ case class CatalogTablePartition( map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}") } map.put("Created Time", new Date(createTime).toString) -map.put("Last Access", new Date(lastAccessTime).toString) +val lastAccess = { + if (-1 == lastAccessTime) "UNKNOWN" else new Date(lastAccessTime).toString +} +map.put("Last Access", lastAccess) --- End diff -- No need for the val lastAccess? ``` map.put("Last Access", if (-1 == lastAccessTime) "UNKNOWN" else new Date(lastAccessTime).toString) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202704259 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2248,4 +2249,20 @@ class HiveDDLSuite checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("desc formatted table for last access verification") { +withTable("t1") { + sql(s"create table" + +s" if not exists t1 (c1_int int, c2_string string, c3_float float)") + val desc = sql("DESC FORMATTED t1").collect().toSeq + val lastAcessField = desc.filter((r: Row) => r.getValuesMap(Seq("col_name")) +.get("col_name").getOrElse("").equals("Last Access")) + // Check whether lastAcessField key is exist + assert(!lastAcessField.isEmpty) + val validLastAcessFieldValue = lastAcessField.filterNot((r: Row) => ((r +.getValuesMap(Seq("data_type")) +.get("data_type").contains(new Date(-1).toString + assert(lastAcessField.size!=0) --- End diff -- code style nit: blank before and after '!=' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202703948 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2248,4 +2249,20 @@ class HiveDDLSuite checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("desc formatted table for last access verification") { +withTable("t1") { + sql(s"create table" + +s" if not exists t1 (c1_int int, c2_string string, c3_float float)") + val desc = sql("DESC FORMATTED t1").collect().toSeq + val lastAcessField = desc.filter((r: Row) => r.getValuesMap(Seq("col_name")) +.get("col_name").getOrElse("").equals("Last Access")) + // Check whether lastAcessField key is exist + assert(!lastAcessField.isEmpty) + val validLastAcessFieldValue = lastAcessField.filterNot((r: Row) => ((r --- End diff -- where is the val `validLastAcessFieldValue` used? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21775: [SPARK-24812][SQL] Last Access Time in the table ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21775#discussion_r202701870 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2248,4 +2249,20 @@ class HiveDDLSuite checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("desc formatted table for last access verification") { +withTable("t1") { + sql(s"create table" + +s" if not exists t1 (c1_int int, c2_string string, c3_float float)") + val desc = sql("DESC FORMATTED t1").collect().toSeq + val lastAcessField = desc.filter((r: Row) => r.getValuesMap(Seq("col_name")) --- End diff -- nit: lastAccessField --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21618: [SPARK-20408][SQL] Get the glob path in parallel to redu...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21618 gental ping @cloud-fan @gatorsmile @kiszk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21729: SPARK-24755 Executor loss can cause task to not be resub...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/21729 Please change the title to '[SPARK-24755][Core] Executor loss can cause task to not be resubmitted' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r201007556 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -656,6 +656,25 @@ object SQLConf { .intConf .createWithDefault(1) + val PARALLEL_GET_GLOBBED_PATH_THRESHOLD = +buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold") + .doc("The maximum number of subfiles or directories allowed after a globbed path " + +"expansion. If the number of paths exceeds this value during expansion, it tries to " + +"expand the globbed in parallel with multi-thread.") + .intConf + .checkValue(threshlod => threshlod >= 0, "The maximum number of subfiles or directories " + +"must not be negative") + .createWithDefault(32) + + val PARALLEL_GET_GLOBBED_PATH_NUM_THREADS = +buildConf("spark.sql.sources.parallelGetGlobbedPath.numThreads") + .doc("The number of threads to get a collection of path in parallel. Set the " + +"number to avoid generating too many threads.") + .intConf + .checkValue(parallel => parallel >= 0, "The maximum number of threads allowed for getting " + --- End diff -- Thanks for your catch, while this value set to 0 we'll get a IllegalArgumentException during new ThreadPoolExecutor. So I use the 0 value here as the default value for controlling this feature as we discuss in https://github.com/apache/spark/pull/21618#discussion_r200465855 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r201006447 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -656,6 +656,25 @@ object SQLConf { .intConf .createWithDefault(1) + val PARALLEL_GET_GLOBBED_PATH_THRESHOLD = +buildConf("spark.sql.sources.parallelGetGlobbedPath.threshold") + .doc("The maximum number of subfiles or directories allowed after a globbed path " + +"expansion. If the number of paths exceeds this value during expansion, it tries to " + +"expand the globbed in parallel with multi-thread.") + .intConf + .checkValue(threshlod => threshlod >= 0, "The maximum number of subfiles or directories " + --- End diff -- Thanks, done in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21618: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21618#discussion_r201006275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -724,4 +726,35 @@ object DataSource extends Logging { """.stripMargin) } } + + /** + * Return all paths represented by the wildcard string. + * Use a local thread pool to do this while there's too many paths. + */ + private def getGlobbedPaths( + sparkSession: SparkSession, + fs: FileSystem, + hadoopConf: Configuration, + qualified: Path): Seq[Path] = { +val getGlobbedPathThreshold = sparkSession.sessionState.conf.parallelGetGlobbedPathThreshold +val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified, getGlobbedPathThreshold) --- End diff -- Thanks for your advise, I'll reuse the value of `spark.sql.sources.parallelGetGlobbedPath.numThreads` to control this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21729#discussion_r200990424 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("SPARK-24755 Executor loss can cause task to not be resubmitted") { +val conf = new SparkConf().set("spark.speculation", "true") +sc = new SparkContext("local", "test", conf) +// Set the speculation multiplier to be 0 so speculative tasks are launched immediately +sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation.quantile", "0.5") +sc.conf.set("spark.speculation", "true") + +var killTaskCalled = false +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3")) +sched.initialize(new FakeSchedulerBackend() { + override def killTask(taskId: Long, +executorId: String, +interruptThread: Boolean, +reason: String): Unit = { +// Check the only one killTask event in this case, which triggered by +// task 2.1 completed. +assert(taskId === 2) +assert(executorId === "exec3") +assert(interruptThread) +assert(reason === "another attempt succeeded") +killTaskCalled = true + } +}) + +// Keep track of the index of tasks that are resubmitted, +// so that the test can check that task is resubmitted correctly +var resubmittedTasks = new mutable.HashSet[Int] +val dagScheduler = new FakeDAGScheduler(sc, sched) { + override def taskEnded(task: Task[_], + reason: TaskEndReason, --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21729#discussion_r200990279 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("SPARK-24755 Executor loss can cause task to not be resubmitted") { +val conf = new SparkConf().set("spark.speculation", "true") +sc = new SparkContext("local", "test", conf) +// Set the speculation multiplier to be 0 so speculative tasks are launched immediately +sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation.quantile", "0.5") +sc.conf.set("spark.speculation", "true") + +var killTaskCalled = false +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3")) +sched.initialize(new FakeSchedulerBackend() { + override def killTask(taskId: Long, +executorId: String, --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21729: SPARK-24755 Executor loss can cause task to not b...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21729#discussion_r200989413 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -87,7 +87,7 @@ private[spark] class TaskSetManager( // Set the coresponding index of Boolean var when the task killed by other attempt tasks, --- End diff -- typo I made before, coresponding -> corresponding. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21642#discussion_r200160518 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -73,6 +74,10 @@ private[spark] class AppStatusListener( // around liveExecutors. @volatile private var activeExecutorCount = 0 + private val inputDataSetId = new AtomicLong(0) + private val outputDataSetId = new AtomicLong(0) + private val maxRecords = conf.getInt("spark.data.maxRecords", 1000) --- End diff -- What's this `spark.data.maxRecords` for? Maybe you should follow the config in core/src/main/scala/org/apache/spark/status/config.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21642#discussion_r200159852 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent @DeveloperApi case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent +/** + * An internal class that describes the input data of an event log. + */ +@DeveloperApi +case class SparkListenerInputUpdate(format: String, +options: Map[String, String], --- End diff -- indent. see:https://github.com/apache/spark/pull/21642/files#diff-fbe8f967070627c8dc155237e77c7314R172 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21642#discussion_r200160022 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -19,6 +19,7 @@ package org.apache.spark.status import java.util.Date import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong --- End diff -- import order error here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21642: [SPARK-22425][CORE][SQL] record inputs/outputs th...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21642#discussion_r200159949 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -185,6 +185,24 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent @DeveloperApi case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent +/** + * An internal class that describes the input data of an event log. + */ +@DeveloperApi +case class SparkListenerInputUpdate(format: String, +options: Map[String, String], +locations: Seq[String] = Seq.empty[String]) + extends SparkListenerEvent + +/** + * An internal class that describes the non-table output of an event log. + */ +@DeveloperApi +case class SparkListenerOutputUpdate(format: String, + mode: String, --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org