[GitHub] [spark] SparkQA commented on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of array type

2020-06-18 Thread GitBox


SparkQA commented on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646451372


   **[Test build #124242 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124242/testReport)**
 for PR 28831 at commit 
[`e8f972d`](https://github.com/apache/spark/commit/e8f972db9baf05778339d01335e1422155672017).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #28835: [SPARK-31926][SQL][TESTS][FOLLOWUP][test-hive1.2][test-maven] Fix concurrency issue for ThriftCLIService to getPortNumber

2020-06-18 Thread GitBox


cloud-fan commented on pull request #28835:
URL: https://github.com/apache/spark/pull/28835#issuecomment-646451325


   thanks, merging to master! (don't backport as it's not a test only PR 
anymore)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #27428: [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT

2020-06-18 Thread GitBox


beliefer commented on pull request #27428:
URL: https://github.com/apache/spark/pull/27428#issuecomment-646451298


   cc @cloud-fan 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you opened a new pull request #28868: [SPARK-32029][SQL] Make activeSession null when application end

2020-06-18 Thread GitBox


ulysses-you opened a new pull request #28868:
URL: https://github.com/apache/spark/pull/28868


   
   
   ### What changes were proposed in this pull request?
   
   Make active session null when application is end. And add spark context 
check in `spark.withActive()`
   
   ### Why are the changes needed?
   
   Now default session is set null when application end. However user can call 
`sparkSession.sql()` and use active session as normal. We should forbid this.
   
   ### Does this PR introduce _any_ user-facing change?
   
   BUG fix.
   ### How was this patch tested?
   
   Add UT.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan closed pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-18 Thread GitBox


cloud-fan closed pull request #28707:
URL: https://github.com/apache/spark/pull/28707


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #28707: [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store

2020-06-18 Thread GitBox


cloud-fan commented on pull request #28707:
URL: https://github.com/apache/spark/pull/28707#issuecomment-646450708


   thanks, merging to master! (I think this patch is too big to backport)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28841: [SPARK-31962][SQL] Provide option to load files after a specified date when reading from a folder path

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-646447834


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/124247/
   Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28841: [SPARK-31962][SQL] Provide option to load files after a specified date when reading from a folder path

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-646447828


   Merged build finished. Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28841: [SPARK-31962][SQL] Provide option to load files after a specified date when reading from a folder path

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-646447828







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28835: [SPARK-31926][SQL][TESTS][FOLLOWUP][test-hive1.2][test-maven] Fix concurrency issue for ThriftCLIService to getPortNumber

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28835:
URL: https://github.com/apache/spark/pull/28835#issuecomment-646447496







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #28841: [SPARK-31962][SQL] Provide option to load files after a specified date when reading from a folder path

2020-06-18 Thread GitBox


SparkQA removed a comment on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-646396659


   **[Test build #124247 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124247/testReport)**
 for PR 28841 at commit 
[`256cb1b`](https://github.com/apache/spark/commit/256cb1bfe717c9cf0ce2e4204125295ae7569895).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28841: [SPARK-31962][SQL] Provide option to load files after a specified date when reading from a folder path

2020-06-18 Thread GitBox


SparkQA commented on pull request #28841:
URL: https://github.com/apache/spark/pull/28841#issuecomment-646447622


   **[Test build #124247 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124247/testReport)**
 for PR 28841 at commit 
[`256cb1b`](https://github.com/apache/spark/commit/256cb1bfe717c9cf0ce2e4204125295ae7569895).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28835: [SPARK-31926][SQL][TESTS][FOLLOWUP][test-hive1.2][test-maven] Fix concurrency issue for ThriftCLIService to getPortNumber

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28835:
URL: https://github.com/apache/spark/pull/28835#issuecomment-646447496







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #28835: [SPARK-31926][SQL][TESTS][FOLLOWUP][test-hive1.2][test-maven] Fix concurrency issue for ThriftCLIService to getPortNumber

2020-06-18 Thread GitBox


SparkQA removed a comment on pull request #28835:
URL: https://github.com/apache/spark/pull/28835#issuecomment-646400501


   **[Test build #124249 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124249/testReport)**
 for PR 28835 at commit 
[`e409f9a`](https://github.com/apache/spark/commit/e409f9ac1dd899e1d81f1e45000e4799ce3e3a2c).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28835: [SPARK-31926][SQL][TESTS][FOLLOWUP][test-hive1.2][test-maven] Fix concurrency issue for ThriftCLIService to getPortNumber

2020-06-18 Thread GitBox


SparkQA commented on pull request #28835:
URL: https://github.com/apache/spark/pull/28835#issuecomment-646446486


   **[Test build #124249 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124249/testReport)**
 for PR 28835 at commit 
[`e409f9a`](https://github.com/apache/spark/commit/e409f9ac1dd899e1d81f1e45000e4799ce3e3a2c).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan closed pull request #27617: [SPARK-30865][SQL][SS] Refactor DateTimeUtils

2020-06-18 Thread GitBox


cloud-fan closed pull request #27617:
URL: https://github.com/apache/spark/pull/27617


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #27617: [SPARK-30865][SQL][SS] Refactor DateTimeUtils

2020-06-18 Thread GitBox


cloud-fan commented on pull request #27617:
URL: https://github.com/apache/spark/pull/27617#issuecomment-646446303


   thanks, merging to master!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646445485







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #28805: [SPARK-28169][SQL] Convert scan predicate condition to CNF

2020-06-18 Thread GitBox


AngersZh commented on pull request #28805:
URL: https://github.com/apache/spark/pull/28805#issuecomment-646445716


   > The CNF process should break down `dt = 20190626 and id in (1,2,3)` to 
`Seq((dt = 20190626), (id in (1,2,3))`, and then these two sub-predicates will 
be processed in `groupExpressionsByQualifier`. What is the problem here?
   
   In current partition pruning,  ScanOperation get predicates by 
`splitConjunctivePredicates` , 
   if there is ```(dt = 1 or (dt = 2 and id = 3))```,  it won't be seperated, 
then since this expression is reference contains (id, dt), it won't be pushed 
down as a partition predicates.  Then it will scan all data in the partition 
table.
   ```
   object HiveTableScans extends Strategy {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
 case ScanOperation(projectList, predicates, relation: 
HiveTableRelation) =>
   // Filter out all predicates that only deal with partition keys, 
these are given to the
   // hive table scan operator to be used for partition pruning.
   val partitionKeyIds = AttributeSet(relation.partitionCols)
   val (pruningPredicates, otherPredicates) = predicates.partition { 
predicate =>
 !predicate.references.isEmpty &&
 predicate.references.subsetOf(partitionKeyIds)
   }
   
   pruneFilterProject(
 projectList,
 otherPredicates,
 identity[Seq[Expression]],
 HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) 
:: Nil
 case _ =>
   Nil
   }
 }
   ```
   
   With convert to CNF, ```(dt = 1 or (dt = 2 and id = 3))``` will be converted 
to ```(dt = 1 or dt = 2) and  (dt = 1 or id = 3))``` then this expression can 
be split by  `splitConjunctivePredicates` and split to two expression ```(dt = 
1 or dt = 2)``` and ``` (dt = 1 or id = 3))```, then  ```(dt = 1 or dt = 2)``` 
can be pushed down as partition pruning predicates.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of array

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646445485







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of arr

2020-06-18 Thread GitBox


SparkQA removed a comment on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646368574


   **[Test build #124243 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124243/testReport)**
 for PR 28831 at commit 
[`2477d11`](https://github.com/apache/spark/commit/2477d119fd1de18a07e02d0b0d5e19f268472d9b).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of array type

2020-06-18 Thread GitBox


SparkQA commented on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646444772


   **[Test build #124243 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124243/testReport)**
 for PR 28831 at commit 
[`2477d11`](https://github.com/apache/spark/commit/2477d119fd1de18a07e02d0b0d5e19f268472d9b).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on pull request #28805: [SPARK-28169][SQL] Convert scan predicate condition to CNF

2020-06-18 Thread GitBox


gengliangwang commented on pull request #28805:
URL: https://github.com/apache/spark/pull/28805#issuecomment-646443250


   > In this case, Spark will resolve Or condition as one expression, and since 
this expr has reference of "id", then it can't been push down.
   
   Sorry, could you explain more here? 
   The CNF process should break down `dt = 20190626 and id in (1,2,3)` to 
`Seq((dt = 20190626), (id in (1,2,3))`, and then these two sub-predicates will 
be processed in `groupExpressionsByQualifier`. What is the problem here?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


maropu commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r442637400



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` if the following 
conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective 
sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if (large % small == 0 &&
+  large / small <= 
conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: 
Int): SparkPlan = {
+plan.transformUp {
+  case f: FileSourceScanExec =>
+f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+}
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)) {
+  return plan
+}
+
+plan transform {
+  case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, 
numRightBuckets)
+if numLeftBuckets != numRightBuckets =>
+mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { 
numCoalescedBuckets =>
+  if (numCoalescedBuckets != numLeftBuckets) {
+smj.copy(left = updateNumCoalescedBuckets(smj.left, 
numCoalescedBuckets))
+  } else {
+smj.copy(right = updateNumCoalescedBuckets(smj.right, 
numCoalescedBuckets))
+  }
+}.getOrElse(smj)
+  case other => other
+}
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` where both sides of the join 
have the bucketed
+ * tables and are consisted of only the scan operation.
+ */
+object ExtractSortMergeJoinWithBuckets {
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+case f: FilterExec => isScanOperation(f.child)
+case p: ProjectExec => isScanOperation(p.child)
+case _: FileSourceScanExec => true
+case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+plan.collectFirst {
+  case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+  f.optionalNumCoalescedBuckets.isEmpty =>
+f.relation.bucketSpec.get
+}
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note 
that
+   * the ordering does not matter because it will be handled in 
`EnsureRequirements`.
+   */
+  private def satisfiesOutputPartitioning(
+  keys: Seq[Expression],
+  partitioning: Partitioning): Boolean = {
+partitioning match {
+  case HashPartitioning(exprs, _) if exprs.length == keys.length =>
+exprs.forall(e => 

[GitHub] [spark] maropu commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


maropu commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r442637311



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.optimizer.BuildLeft
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, 
SparkPlan}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
InMemoryFileIndex, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, StructType}
+
+class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with 
SharedSparkSession {
+  case class RelationSetting(
+  cols: Seq[Attribute],
+  numBuckets: Int,
+  expectedCoalescedNumBuckets: Option[Int])
+
+  object RelationSetting {
+def apply(numBuckets: Int, expectedCoalescedNumBuckets: Option[Int]): 
RelationSetting = {
+  val cols = Seq(AttributeReference("i", IntegerType)())
+  RelationSetting(cols, numBuckets, expectedCoalescedNumBuckets)
+}
+  }
+
+  case class JoinSetting(
+  leftKeys: Seq[Attribute],
+  rightKeys: Seq[Attribute],
+  leftRelation: RelationSetting,
+  rightRelation: RelationSetting,
+  isSortMergeJoin: Boolean)
+
+  object JoinSetting {
+def apply(l: RelationSetting, r: RelationSetting, isSortMergeJoin: 
Boolean): JoinSetting = {
+  JoinSetting(l.cols, r.cols, l, r, isSortMergeJoin)
+}
+  }
+
+  private def newFileSourceScanExec(setting: RelationSetting): 
FileSourceScanExec = {
+val relation = HadoopFsRelation(
+  location = new InMemoryFileIndex(spark, Nil, Map.empty, None),
+  partitionSchema = PartitionSpec.emptySpec.partitionColumns,
+  dataSchema = StructType.fromAttributes(setting.cols),
+  bucketSpec = Some(BucketSpec(setting.numBuckets, 
setting.cols.map(_.name), Nil)),
+  fileFormat = new ParquetFileFormat(),
+  options = Map.empty)(spark)
+FileSourceScanExec(relation, setting.cols, relation.dataSchema, Nil, None, 
None, Nil, None)
+  }
+
+  private def run(setting: JoinSetting): Unit = {
+val swappedSetting = setting.copy(
+  leftKeys = setting.rightKeys,
+  rightKeys = setting.leftKeys,
+  leftRelation = setting.rightRelation,
+  rightRelation = setting.leftRelation)
+
+Seq(setting, swappedSetting).foreach { case s =>
+  val lScan = newFileSourceScanExec(s.leftRelation)
+  val rScan = newFileSourceScanExec(s.rightRelation)
+  val join = if (s.isSortMergeJoin) {
+SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan)
+  } else {
+BroadcastHashJoinExec(
+  s.leftKeys, s.rightKeys, Inner, BuildLeft, None, lScan, rScan)
+  }
+
+  val plan = CoalesceBucketsInSortMergeJoin(spark.sessionState.conf)(join)
+
+  def verify(expected: Option[Int], subPlan: SparkPlan): Unit = {
+val coalesced = subPlan.collect {
+  case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.nonEmpty 
=>
+f.optionalNumCoalescedBuckets.get
+}
+if (expected.isDefined) {
+  assert(coalesced.size == 1 && coalesced(0) == expected.get)
+} else {
+  assert(coalesced.isEmpty)
+}
+  }
+
+  verify(s.leftRelation.expectedCoalescedNumBuckets, 
plan.asInstanceOf[BinaryExecNode].left)
+  verify(s.rightRelation.expectedCoalescedNumBuckets, 
plan.asInstanceOf[BinaryExecNode].right)
+}
+  }
+
+  test("bucket coalescing - basic") {
+withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> 
"true") {
+  

[GitHub] [spark] zhli1142015 commented on pull request #28867: [SPARK-32028][WEBUI] fix app id link for multi attempts app in history summary page

2020-06-18 Thread GitBox


zhli1142015 commented on pull request #28867:
URL: https://github.com/apache/spark/pull/28867#issuecomment-646440070


   @dongjoon-hyun , @vanzin, @jerryshao , could you please help review this? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28867: [SPARK-32028][WEBUI] fix app id link for multi attempts app in history summary page

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28867:
URL: https://github.com/apache/spark/pull/28867#issuecomment-646438358


   Can one of the admins verify this patch?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28867: [SPARK-32028][WEBUI] fix app id link for multi attempts app in history summary page

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28867:
URL: https://github.com/apache/spark/pull/28867#issuecomment-646438634


   Can one of the admins verify this patch?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28867: [SPARK-32028][WEBUI] fix app id link for multi attempts app in history summary page

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28867:
URL: https://github.com/apache/spark/pull/28867#issuecomment-646438358


   Can one of the admins verify this patch?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhli1142015 opened a new pull request #28867: [SPARK-32028][WEBUI] fix app id link for multi attempts app in history summary page

2020-06-18 Thread GitBox


zhli1142015 opened a new pull request #28867:
URL: https://github.com/apache/spark/pull/28867


   
   
   ### What changes were proposed in this pull request?
   
   Fix app id link for multi attempts application in history summary page
   If attempt id is available (applications run yarn), app id link url will 
contain correct attempt id.
   If attempt id is not available (standalone), app id link url will not 
contain fake attempt id.
   
   ### Why are the changes needed?
   
   This PR is for fixing 
[32028](https://issues.apache.org/jira/browse/SPARK-32028). App id link use 
application attempt count as attempt id. this would cause link url wrong for 
below cases:
   1. there are multi attempts, all links would point to last attempt
   
![multi_same](https://user-images.githubusercontent.com/10524738/85098505-c45c5500-b1af-11ea-8912-fa5fd72ce064.JPG)
   
   2. if there is one attempt, but attempt id is not 1 (before attempt maybe 
crash or fail to gerenerate event file). link url points to worng attempt (1) 
here.
   
![wrong_attemptJPG](https://user-images.githubusercontent.com/10524738/85098513-c9b99f80-b1af-11ea-8cbc-fd7f745c1080.JPG)
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Tested this manually.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28645: [SPARK-31826][SQL] Support composed type of case class for typed Scala UDF

2020-06-18 Thread GitBox


cloud-fan commented on a change in pull request #28645:
URL: https://github.com/apache/spark/pull/28645#discussion_r442631441



##
File path: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
##
@@ -581,4 +581,99 @@ class UDFSuite extends QueryTest with SharedSparkSession {
   .toDF("col1", "col2")
 checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(2020) :: 
Nil)
   }
+
+  test("case class as element type of Seq/Array") {
+val f1 = (s: Seq[TestData]) => s.map(d => d.key * d.value.toInt).sum
+val myUdf1 = udf(f1)
+val df1 = Seq(("data", Seq(TestData(50, "2".toDF("col1", "col2")
+checkAnswer(df1.select(myUdf1(Column("col2"))), Row(100) :: Nil)
+
+val f2 = (s: Array[TestData]) => s.map(d => d.key * d.value.toInt).sum
+val myUdf2 = udf(f2)
+val df2 = Seq(("data", Array(TestData(50, "2".toDF("col1", "col2")
+checkAnswer(df2.select(myUdf2(Column("col2"))), Row(100) :: Nil)
+  }
+
+  test("case class as key/value type of Map") {
+val f1 = (s: Map[TestData, Int]) => s.keys.head.key * 
s.keys.head.value.toInt
+val myUdf1 = udf(f1)
+val df1 = Seq(("data", Map(TestData(50, "2") -> 502))).toDF("col1", "col2")
+checkAnswer(df1.select(myUdf1(Column("col2"))), Row(100) :: Nil)
+
+val f2 = (s: Map[Int, TestData]) => s.values.head.key * 
s.values.head.value.toInt
+val myUdf2 = udf(f2)
+val df2 = Seq(("data", Map(502 -> TestData(50, "2".toDF("col1", "col2")
+checkAnswer(df2.select(myUdf2(Column("col2"))), Row(100) :: Nil)
+
+val f3 = (s: Map[TestData, TestData]) => s.keys.head.key * 
s.values.head.value.toInt
+val myUdf3 = udf(f3)
+val df3 = Seq(("data", Map(TestData(50, "2") -> TestData(50, 
"2".toDF("col1", "col2")
+checkAnswer(df3.select(myUdf3(Column("col2"))), Row(100) :: Nil)
+  }
+
+  test("case class as element of tuple") {
+val f = (s: (TestData, Int)) => s._1.key * s._2
+val myUdf = udf(f)
+val df = Seq(("data", (TestData(50, "2"), 2))).toDF("col1", "col2")
+checkAnswer(df.select(myUdf(Column("col2"))), Row(100) :: Nil)
+  }
+
+  test("case class as generic type of Option") {
+val f = (o: Option[TestData]) => o.map(t => t.key * t.value.toInt)
+val myUdf = udf(f)
+val df1 = Seq(("data", Some(TestData(50, "2".toDF("col1", "col2")
+checkAnswer(df1.select(myUdf(Column("col2"))), Row(100) :: Nil)
+val df2 = Seq(("data", None: Option[TestData])).toDF("col1", "col2")
+checkAnswer(df2.select(myUdf(Column("col2"))), Row(null) :: Nil)
+  }
+
+  test("more input fields than expect for case class") {
+val f = (t: TestData2) => t.a * t.b
+val myUdf = udf(f)
+val df = spark.range(1)
+  .select(lit(50).as("a"), lit(2).as("b"), lit(2).as("c"))
+  .select(struct("a", "b", "c").as("col"))
+checkAnswer(df.select(myUdf(Column("col"))), Row(100) :: Nil)
+  }
+
+  test("less input fields than expect for case class") {
+val f = (t: TestData2) => t.a * t.b
+val myUdf = udf(f)
+val df = spark.range(1)
+  .select(lit(50).as("a"))
+  .select(struct("a").as("col"))
+val error = intercept[AnalysisException] (df.select(myUdf(Column("col"

Review comment:
   nit: no space between `intercept[AnalysisException]` and `(df.select...`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28645: [SPARK-31826][SQL] Support composed type of case class for typed Scala UDF

2020-06-18 Thread GitBox


cloud-fan commented on a change in pull request #28645:
URL: https://github.com/apache/spark/pull/28645#discussion_r442631217



##
File path: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
##
@@ -581,4 +581,99 @@ class UDFSuite extends QueryTest with SharedSparkSession {
   .toDF("col1", "col2")
 checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(2020) :: 
Nil)
   }
+
+  test("case class as element type of Seq/Array") {
+val f1 = (s: Seq[TestData]) => s.map(d => d.key * d.value.toInt).sum
+val myUdf1 = udf(f1)
+val df1 = Seq(("data", Seq(TestData(50, "2".toDF("col1", "col2")
+checkAnswer(df1.select(myUdf1(Column("col2"))), Row(100) :: Nil)
+
+val f2 = (s: Array[TestData]) => s.map(d => d.key * d.value.toInt).sum
+val myUdf2 = udf(f2)
+val df2 = Seq(("data", Array(TestData(50, "2".toDF("col1", "col2")
+checkAnswer(df2.select(myUdf2(Column("col2"))), Row(100) :: Nil)
+  }
+
+  test("case class as key/value type of Map") {
+val f1 = (s: Map[TestData, Int]) => s.keys.head.key * 
s.keys.head.value.toInt
+val myUdf1 = udf(f1)
+val df1 = Seq(("data", Map(TestData(50, "2") -> 502))).toDF("col1", "col2")
+checkAnswer(df1.select(myUdf1(Column("col2"))), Row(100) :: Nil)
+
+val f2 = (s: Map[Int, TestData]) => s.values.head.key * 
s.values.head.value.toInt
+val myUdf2 = udf(f2)
+val df2 = Seq(("data", Map(502 -> TestData(50, "2".toDF("col1", "col2")
+checkAnswer(df2.select(myUdf2(Column("col2"))), Row(100) :: Nil)
+
+val f3 = (s: Map[TestData, TestData]) => s.keys.head.key * 
s.values.head.value.toInt
+val myUdf3 = udf(f3)
+val df3 = Seq(("data", Map(TestData(50, "2") -> TestData(50, 
"2".toDF("col1", "col2")
+checkAnswer(df3.select(myUdf3(Column("col2"))), Row(100) :: Nil)
+  }
+
+  test("case class as element of tuple") {
+val f = (s: (TestData, Int)) => s._1.key * s._2
+val myUdf = udf(f)
+val df = Seq(("data", (TestData(50, "2"), 2))).toDF("col1", "col2")
+checkAnswer(df.select(myUdf(Column("col2"))), Row(100) :: Nil)
+  }
+
+  test("case class as generic type of Option") {
+val f = (o: Option[TestData]) => o.map(t => t.key * t.value.toInt)
+val myUdf = udf(f)
+val df1 = Seq(("data", Some(TestData(50, "2".toDF("col1", "col2")
+checkAnswer(df1.select(myUdf(Column("col2"))), Row(100) :: Nil)
+val df2 = Seq(("data", None: Option[TestData])).toDF("col1", "col2")
+checkAnswer(df2.select(myUdf(Column("col2"))), Row(null) :: Nil)
+  }
+
+  test("more input fields than expect for case class") {
+val f = (t: TestData2) => t.a * t.b
+val myUdf = udf(f)
+val df = spark.range(1)
+  .select(lit(50).as("a"), lit(2).as("b"), lit(2).as("c"))
+  .select(struct("a", "b", "c").as("col"))
+checkAnswer(df.select(myUdf(Column("col"))), Row(100) :: Nil)
+  }
+
+  test("less input fields than expect for case class") {
+val f = (t: TestData2) => t.a * t.b
+val myUdf = udf(f)
+val df = spark.range(1)
+  .select(lit(50).as("a"))
+  .select(struct("a").as("col"))
+val error = intercept[AnalysisException] (df.select(myUdf(Column("col"
+assert(error.getMessage.contains("cannot resolve '`b`' given input 
columns: [a]"))
+  }
+
+  test("wrong order of input fields for case class") {
+val f = (t: TestData) => t.key * t.value.toInt
+val myUdf = udf(f)
+val df = spark.range(1)
+  .select(lit("2").as("value"), lit(50).as("key"))
+  .select(struct("value", "key").as("col"))
+checkAnswer(df.select(myUdf(Column("col"))), Row(100) :: Nil)
+  }
+
+  test("top level Option primitive type") {
+val f = (i: Option[Int]) => i.map(_ * 10)
+val myUdf = udf(f)
+val df = Seq(Some(10), None).toDF("col")
+checkAnswer(df.select(myUdf(Column("col"))), Row(100) :: Row(null) :: Nil)
+  }
+
+  test("top level Option case class") {

Review comment:
   This is already tested in `case class as generic type of Option`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28123:
URL: https://github.com/apache/spark/pull/28123#issuecomment-646435314







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28123:
URL: https://github.com/apache/spark/pull/28123#issuecomment-646435314







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


SparkQA commented on pull request #28123:
URL: https://github.com/apache/spark/pull/28123#issuecomment-646434778


   **[Test build #124258 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124258/testReport)**
 for PR 28123 at commit 
[`62a04a3`](https://github.com/apache/spark/commit/62a04a3e4d94e63b3787533f619e368a7e8d59f6).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r442629743



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` if the following 
conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective 
sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if (large % small == 0 &&
+  large / small <= 
conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: 
Int): SparkPlan = {
+plan.transformUp {
+  case f: FileSourceScanExec =>
+f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+}
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)) {
+  return plan
+}
+
+plan transform {
+  case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, 
numRightBuckets)
+if numLeftBuckets != numRightBuckets =>
+mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { 
numCoalescedBuckets =>
+  if (numCoalescedBuckets != numLeftBuckets) {
+smj.copy(left = updateNumCoalescedBuckets(smj.left, 
numCoalescedBuckets))
+  } else {
+smj.copy(right = updateNumCoalescedBuckets(smj.right, 
numCoalescedBuckets))
+  }
+}.getOrElse(smj)
+  case other => other
+}
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` where both sides of the join 
have the bucketed
+ * tables and are consisted of only the scan operation.
+ */
+object ExtractSortMergeJoinWithBuckets {
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+case f: FilterExec => isScanOperation(f.child)
+case p: ProjectExec => isScanOperation(p.child)
+case _: FileSourceScanExec => true
+case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+plan.collectFirst {
+  case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+  f.optionalNumCoalescedBuckets.isEmpty =>
+f.relation.bucketSpec.get
+}
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note 
that
+   * the ordering does not matter because it will be handled in 
`EnsureRequirements`.
+   */
+  private def satisfiesOutputPartitioning(
+  keys: Seq[Expression],
+  partitioning: Partitioning): Boolean = {
+partitioning match {
+  case HashPartitioning(exprs, _) if exprs.length == keys.length =>
+exprs.forall(e => 

[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r442629600



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.optimizer.BuildLeft
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, 
SparkPlan}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
InMemoryFileIndex, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, StructType}
+
+class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with 
SharedSparkSession {
+  case class RelationSetting(
+  cols: Seq[Attribute],
+  numBuckets: Int,
+  expectedCoalescedNumBuckets: Option[Int])
+
+  object RelationSetting {
+def apply(numBuckets: Int, expectedCoalescedNumBuckets: Option[Int]): 
RelationSetting = {
+  val cols = Seq(AttributeReference("i", IntegerType)())
+  RelationSetting(cols, numBuckets, expectedCoalescedNumBuckets)
+}
+  }
+
+  case class JoinSetting(
+  leftKeys: Seq[Attribute],
+  rightKeys: Seq[Attribute],
+  leftRelation: RelationSetting,
+  rightRelation: RelationSetting,
+  isSortMergeJoin: Boolean)
+
+  object JoinSetting {
+def apply(l: RelationSetting, r: RelationSetting, isSortMergeJoin: 
Boolean): JoinSetting = {
+  JoinSetting(l.cols, r.cols, l, r, isSortMergeJoin)
+}
+  }
+
+  private def newFileSourceScanExec(setting: RelationSetting): 
FileSourceScanExec = {
+val relation = HadoopFsRelation(
+  location = new InMemoryFileIndex(spark, Nil, Map.empty, None),
+  partitionSchema = PartitionSpec.emptySpec.partitionColumns,
+  dataSchema = StructType.fromAttributes(setting.cols),
+  bucketSpec = Some(BucketSpec(setting.numBuckets, 
setting.cols.map(_.name), Nil)),
+  fileFormat = new ParquetFileFormat(),
+  options = Map.empty)(spark)
+FileSourceScanExec(relation, setting.cols, relation.dataSchema, Nil, None, 
None, Nil, None)
+  }
+
+  private def run(setting: JoinSetting): Unit = {
+val swappedSetting = setting.copy(
+  leftKeys = setting.rightKeys,
+  rightKeys = setting.leftKeys,
+  leftRelation = setting.rightRelation,
+  rightRelation = setting.leftRelation)
+
+Seq(setting, swappedSetting).foreach { case s =>
+  val lScan = newFileSourceScanExec(s.leftRelation)
+  val rScan = newFileSourceScanExec(s.rightRelation)
+  val join = if (s.isSortMergeJoin) {
+SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan)
+  } else {
+BroadcastHashJoinExec(
+  s.leftKeys, s.rightKeys, Inner, BuildLeft, None, lScan, rScan)
+  }
+
+  val plan = CoalesceBucketsInSortMergeJoin(spark.sessionState.conf)(join)
+
+  def verify(expected: Option[Int], subPlan: SparkPlan): Unit = {
+val coalesced = subPlan.collect {
+  case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.nonEmpty 
=>
+f.optionalNumCoalescedBuckets.get
+}
+if (expected.isDefined) {
+  assert(coalesced.size == 1 && coalesced(0) == expected.get)
+} else {
+  assert(coalesced.isEmpty)
+}
+  }
+
+  verify(s.leftRelation.expectedCoalescedNumBuckets, 
plan.asInstanceOf[BinaryExecNode].left)
+  verify(s.rightRelation.expectedCoalescedNumBuckets, 
plan.asInstanceOf[BinaryExecNode].right)
+}
+  }
+
+  test("bucket coalescing - basic") {
+withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> 
"true") {
+  

[GitHub] [spark] viirya commented on a change in pull request #27690: [SPARK-21514][SQL] Added a new option to use non-blobstore storage when writing into blobstore storage

2020-06-18 Thread GitBox


viirya commented on a change in pull request #27690:
URL: https://github.com/apache/spark/pull/27690#discussion_r442627998



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -839,6 +839,17 @@ object SQLConf {
 .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
 .createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString)
 
+  val HIVE_SUPPORTED_SCHEMES_TO_USE_NONBLOBSTORE =
+buildConf("spark.sql.hive.supportedSchemesToUseNonBlobstore")
+  .doc("Comma-separated list of supported blobstore schemes (e.g. 
's3,s3a'). " +
+"If any blobstore schemes are specified, this feature is enabled. " +
+"When writing data out to a Hive table, " +
+"Spark writes the data first into non blobstore storage, and then 
moves it to blobstore. " +
+"By default, this option is set to empty. It means this feature is 
disabled.")
+  .version("3.1.0")
+  .stringConf
+  .createWithDefault("")

Review comment:
   Okay, it sounds making sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fqaiser94 commented on pull request #27066: [SPARK-31317][SQL] Add withField method to Column

2020-06-18 Thread GitBox


fqaiser94 commented on pull request #27066:
URL: https://github.com/apache/spark/pull/27066#issuecomment-646429505


   > Can you explain more about this? 
   
   Please see the comment from @dbtsai 
[here](https://github.com/apache/spark/pull/27066#issuecomment-609512144) for a 
more thorough explanation.  
   TBH I'm uncertain now if reusing `CreateNamedStruct` will give us access to 
any further optimizations beyond what the optimization rules I've added in this 
PR will do for `WithFields` anyway. 
   
   > My last concern is we convert WithField to CreateNamedStruct in a kind of 
random place in the optimizer. I'm thinking about something like 
   
   Interesting idea but I'm not sure if it's feasible. I tried code like this: 
   
   ```
   case class WithFields(
   structExpr: Expression,
   nameExprs: Seq[Expression],
   valExprs: Seq[Expression]) extends Expression {
   
 ...
   
 override def eval(input: InternalRow = null): Any = 
toCreateNamedStruct.eval(input)
   
 override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = 
toCreateNamedStruct.doGenCode(ctx, ev)
   }
   ```
   I get the following compile time error: 
   ```
   [error]  Access to protected method doGenCode not permitted because
   [error]  prefix type org.apache.spark.sql.catalyst.expressions.Expression 
does not conform to
   [error]  class WithFields in package expressions where the access takes place
   [error] toCreateNamedStruct.doGenCode(ctx, ev)
   ```
   Please let me know if there's a trick around this. 
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #28857: [SPARK-32023][Streaming]Generate spark streaming test jar with maven plugin maven-jar-plugin

2020-06-18 Thread GitBox


srowen commented on pull request #28857:
URL: https://github.com/apache/spark/pull/28857#issuecomment-646427158


   Hm, I'm a little uneasy about it, as that shade config was put in a long 
time ago to work around Guava issues. It may or may not be necessary, but, what 
if you leave it in?
   
   Also, do you actually need test jars?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command

2020-06-18 Thread GitBox


ulysses-you commented on a change in pull request #28840:
URL: https://github.com/apache/spark/pull/28840#discussion_r442622091



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##
@@ -1341,6 +1341,16 @@ class SessionCatalog(
 functionRegistry.registerFunction(func, info, builder)
   }
 
+  /**
+   * Unregister a temporary or permanent function from a session-specific 
[[FunctionRegistry]]
+   */
+  def unregisterFunction(name: FunctionIdentifier, ignoreIfNotExists: 
Boolean): Unit = {

Review comment:
   Not used. If the function not exists, `refresh function` will throw 
exception. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] TJX2014 commented on a change in pull request #28856: [SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST

2020-06-18 Thread GitBox


TJX2014 commented on a change in pull request #28856:
URL: https://github.com/apache/spark/pull/28856#discussion_r442621499



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
##
@@ -2698,7 +2721,11 @@ object Sequence {
  |  int $i = 0;
  |
  |  while ($t < $exclusiveItem ^ $stepSign < 0) {
- |$arr[$i] = ($elemType) ($t / ${scale}L);
+ |if (${scale}L == 1L) {
+ |  $arr[$i] = ($elemType) ($t / ${scale}L);

Review comment:
   May it is better change `$arr[$i] = ($elemType) ($t / ${scale}L)` to 
`$arr[$i] = ($elemType) $t`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command

2020-06-18 Thread GitBox


ulysses-you commented on a change in pull request #28840:
URL: https://github.com/apache/spark/pull/28840#discussion_r442621427



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
##
@@ -155,4 +155,37 @@ private[sql] trait LookupCatalog extends Logging {
 None
 }
   }
+
+  /**
+   * Extract catalog and function identifier from a multi-part name with the 
current catalog if
+   * needed.
+   *
+   * Note that: function is only supported in v1 catalog.

Review comment:
   The comment just aims to make the exception clear `throw new 
AnalysisException("Function command is only supported in v1 catalog")`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] TJX2014 commented on a change in pull request #28856: [SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST

2020-06-18 Thread GitBox


TJX2014 commented on a change in pull request #28856:
URL: https://github.com/apache/spark/pull/28856#discussion_r442621499



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
##
@@ -2698,7 +2721,11 @@ object Sequence {
  |  int $i = 0;
  |
  |  while ($t < $exclusiveItem ^ $stepSign < 0) {
- |$arr[$i] = ($elemType) ($t / ${scale}L);
+ |if (${scale}L == 1L) {
+ |  $arr[$i] = ($elemType) ($t / ${scale}L);

Review comment:
   `$arr[$i] = ($elemType) ($t / ${scale}L)` -> `$arr[$i] = ($elemType) $t`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] TJX2014 commented on a change in pull request #28856: [SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST

2020-06-18 Thread GitBox


TJX2014 commented on a change in pull request #28856:
URL: https://github.com/apache/spark/pull/28856#discussion_r442621330



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
##
@@ -2635,7 +2643,11 @@ object Sequence {
 var i = 0
 
 while (t < exclusiveItem ^ stepSign < 0) {
-  arr(i) = fromLong(t / scale)
+  arr(i) = if (scale == 1) {
+fromLong(t / scale)

Review comment:
I will change to `fromLong(t)` because it is same as ` fromLong(t / 
scale)` in `if` condition





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command

2020-06-18 Thread GitBox


ulysses-you commented on a change in pull request #28840:
URL: https://github.com/apache/spark/pull/28840#discussion_r442620715



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
##
@@ -155,4 +155,37 @@ private[sql] trait LookupCatalog extends Logging {
 None
 }
   }
+
+  /**
+   * Extract catalog and function identifier from a multi-part name with the 
current catalog if
+   * needed.
+   *
+   * Note that: function is only supported in v1 catalog.
+   */
+  object CatalogAndFunctionIdentifier {
+def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, 
FunctionIdentifier)] = {
+
+  if (nameParts.length == 1 && 
catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) {
+return Some(currentCatalog, FunctionIdentifier(nameParts.head))
+  }
+
+  nameParts match {
+case SessionCatalogAndIdentifier(catalog, ident) =>
+  if (nameParts.length == 1) {
+// If there is only one name part, it means the current catalog is 
the session catalog.
+// Here we don't fill the default database, to keep the error 
message unchanged for
+// v1 commands.
+Some(catalog, FunctionIdentifier(nameParts.head, None))
+  } else {
+ident.namespace match {
+  case Array(db) => Some(catalog, FunctionIdentifier(ident.name, 
Some(db)))
+  case _ =>
+throw new AnalysisException(s"Unsupported function name 
'$ident'")
+}
+  }
+
+case _ => throw new AnalysisException("Function command is only 
supported in v1 catalog")

Review comment:
   `Function command` means `CREATE FUNCTION`, `DROP FUNCTION`, `DESC 
FUNCTION` ...
   
   It seems confused, is it better we list all the function command here ? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command

2020-06-18 Thread GitBox


ulysses-you commented on a change in pull request #28840:
URL: https://github.com/apache/spark/pull/28840#discussion_r442620118



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
##
@@ -155,4 +155,37 @@ private[sql] trait LookupCatalog extends Logging {
 None
 }
   }
+
+  /**
+   * Extract catalog and function identifier from a multi-part name with the 
current catalog if
+   * needed.
+   *
+   * Note that: function is only supported in v1 catalog.
+   */
+  object CatalogAndFunctionIdentifier {

Review comment:
   Thanks for remind, missed here.
   
   Because of the dependency that `ResolveSessionCatalog` is at sql-core 
package. Seems I have to do the refactor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2020-06-18 Thread GitBox


HeartSaVioR commented on pull request #25135:
URL: https://github.com/apache/spark/pull/25135#issuecomment-646423684


   That said, it can be unblocked if we concern about infinite timeout 
seriously (see the behavior fairly often) and consider as critical one 
tolerating additional complexity brought in driver side.
   
   I'm even OK to go with partial address, fix executors usage first, and leave 
an issue for driver. This would greatly reduce the chance of infinite timeout - 
only Kafka consumer in driver can hang.
   
   @zsxwing @gaborgsomogyi WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28805: [SPARK-28169][SQL] Convert scan predicate condition to CNF

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28805:
URL: https://github.com/apache/spark/pull/28805#issuecomment-646422173







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28805: [SPARK-28169][SQL] Convert scan predicate condition to CNF

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28805:
URL: https://github.com/apache/spark/pull/28805#issuecomment-646422173


   Merged build finished. Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28805: [SPARK-28169][SQL] Convert scan predicate condition to CNF

2020-06-18 Thread GitBox


SparkQA commented on pull request #28805:
URL: https://github.com/apache/spark/pull/28805#issuecomment-646421987


   **[Test build #124256 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124256/testReport)**
 for PR 28805 at commit 
[`e71c45c`](https://github.com/apache/spark/commit/e71c45cfd9e1c55deb4c6f7dad6247d0a5027250).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #27066: [SPARK-31317][SQL] Add withField method to Column

2020-06-18 Thread GitBox


SparkQA commented on pull request #27066:
URL: https://github.com/apache/spark/pull/27066#issuecomment-646421998


   **[Test build #124257 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124257/testReport)**
 for PR 27066 at commit 
[`8ada917`](https://github.com/apache/spark/commit/8ada91713f4e989c3610cdc985b2848d07f2622d).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fqaiser94 commented on a change in pull request #27066: [SPARK-31317][SQL] Add withField method to Column

2020-06-18 Thread GitBox


fqaiser94 commented on a change in pull request #27066:
URL: https://github.com/apache/spark/pull/27066#discussion_r442617361



##
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
##
@@ -452,4 +453,91 @@ class ComplexTypesSuite extends PlanTest with 
ExpressionEvalHelper {
 checkEvaluation(GetMapValue(mb0, Literal(Array[Byte](2, 1), BinaryType)), 
"2")
 checkEvaluation(GetMapValue(mb0, Literal(Array[Byte](3, 4))), null)
   }
+
+  test("simplify GetStructField on WithFields that is not changing the 
attribute being extracted") {
+val query = relation.select(
+  GetStructField(
+WithFields('id, Seq("c"), Seq(Literal(1))),
+0,
+Some("a")) as "outerAtt")
+val expected = relation.select(GetStructField('id, 0, Some("a")) as 
"outerAtt")
+
+checkRule(query, expected)
+  }
+
+  test("simplify GetStructField on WithFields that is changing the attribute 
being extracted") {
+val query = relation.select(
+  GetStructField(
+WithFields('id, Seq("c"), Seq(Literal(1))),
+0,
+Some("c")) as "outerAtt")
+val expected = relation.select(Literal(1) as "outerAtt")
+
+checkRule(query, expected)
+  }
+
+  test(
+"simplify GetStructField on WithFields that is changing the attribute 
being extracted twice") {
+val query = relation.select(
+  GetStructField(
+WithFields('id, Seq("c", "c"), Seq(Literal(1), Literal(2))),
+0,
+Some("c")) as "outerAtt")
+val expected = relation.select(Literal(2) as "outerAtt")
+
+checkRule(query, expected)
+  }
+
+  test("collapse multiple GetStructField on the same WithFields") {
+val query = relation
+  .select(CreateNamedStruct(Seq("att1", 'id, "att2", 'id * 'id)) as 
"struct1")
+  .select(WithFields('struct1, Seq("att3"), Seq(Literal(3))) as "struct2")
+  .select(
+GetStructField('struct2, 0, Some("att1")) as "struct1Att1",
+GetStructField('struct2, 1, Some("att2")) as "struct1Att2",
+GetStructField('struct2, 2, Some("att3")) as "struct1Att3")
+
+val expected = relation
+  .select(
+'id as "struct1Att1",
+('id * 'id) as "struct1Att2",
+Literal(3) as "struct1Att3")
+
+checkRule(query, expected)
+  }
+
+  test("collapse multiple GetStructField on different WithFields") {
+val query = relation
+  .select(CreateNamedStruct(Seq("att1", 'id)) as "struct1")
+  .select(
+WithFields('struct1, Seq("att2"), Seq(Literal(2))) as "struct2",
+WithFields('struct1, Seq("att2"), Seq(Literal(3))) as "struct3")
+  .select(
+GetStructField('struct2, 0, Some("att1")) as "struct2Att1",
+GetStructField('struct2, 1, Some("att2")) as "struct2Att2",
+GetStructField('struct3, 0, Some("att1")) as "struct3Att1",
+GetStructField('struct3, 1, Some("att2")) as "struct3Att2")
+
+val expected = relation
+  .select(
+'id as "struct2Att1",
+Literal(2) as "struct2Att2",
+'id as "struct3Att1",
+Literal(3) as "struct3Att2")
+
+checkRule(query, expected)
+  }
+
+  test("WIP write tests for ensuring case sensitivity is respected") {

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] fqaiser94 commented on a change in pull request #27066: [SPARK-31317][SQL] Add withField method to Column

2020-06-18 Thread GitBox


fqaiser94 commented on a change in pull request #27066:
URL: https://github.com/apache/spark/pull/27066#discussion_r442617236



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -539,3 +541,82 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces field in struct by name.
+ */
+case class WithFields(
+  structExpr: Expression,
+  nameExprs: Seq[Expression],
+  valExprs: Seq[Expression]) extends Unevaluable {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val expectedStructType = StructType(Nil).typeName
+if (structExpr.dataType.typeName != expectedStructType) {

Review comment:
   fixed

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -539,3 +541,82 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces field in struct by name.
+ */
+case class WithFields(
+  structExpr: Expression,
+  nameExprs: Seq[Expression],
+  valExprs: Seq[Expression]) extends Unevaluable {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val expectedStructType = StructType(Nil).typeName
+if (structExpr.dataType.typeName != expectedStructType) {
+  TypeCheckResult.TypeCheckFailure(
+"struct argument should be struct type, got: " + 
structExpr.dataType.catalogString)
+} else if (!nameExprs.forall(e => e.foldable && e.dataType == StringType)) 
{
+  TypeCheckResult.TypeCheckFailure(
+s"nameExprs argument should contain only foldable 
${StringType.catalogString} expressions")
+} else if (nameExprs.length != valExprs.length) {

Review comment:
   fixed

##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##
@@ -539,3 +541,82 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces field in struct by name.
+ */
+case class WithFields(
+  structExpr: Expression,
+  nameExprs: Seq[Expression],

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-18 Thread GitBox


agrawaldevesh commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r442615769



##
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##
@@ -178,6 +184,10 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def location: BlockManagerId = loc
 
+  override def updateLocation(bm: BlockManagerId): Unit = {

Review comment:
   Sounds good.

##
File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##
@@ -168,7 +168,8 @@ private[spark] class NettyBlockTransferService(
 // Everything else is encoded using our binary protocol.
 val metadata = 
JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
 
-val asStream = blockData.size() > 
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
+val asStream = (blockData.size() > 
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) ||
+  blockId.isInternalShuffle || blockId.isShuffle)

Review comment:
   Cool. A comment in the code would be great :-)

##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -55,6 +58,25 @@ private[spark] class IndexShuffleBlockResolver(
 
   def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, 
mapId, None)
 
+  /**
+   * Get the shuffle files that are stored locally. Used for block migrations.
+   */
+  override def getStoredShuffles(): Set[ShuffleBlockInfo] = {
+// Matches ShuffleIndexBlockId name
+val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r
+val rootDirs = blockManager.diskBlockManager.localDirs
+// ExecutorDiskUtil puts things inside one level hashed sub directories
+val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) 
++ rootDirs
+val filenames = searchDirs.flatMap(_.list())
+logDebug(s"Got block files ${filenames.toList}")
+filenames.flatMap { fname =>
+  pattern.findAllIn(fname).matchData.map {
+matched => ShuffleBlockInfo(matched.group(1).toInt, 
matched.group(2).toLong)
+  }
+}.toSet

Review comment:
   Got it. Do you expect to get duplicates ... ie, should this be a 
`.toSet` ?

##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each thread migrates blocks to a different particular 
executor to avoid
+   * distribute the blocks as quickly as possible without overwhelming any 
particular executor.
+   *
+   * There is no preference for which peer a given block is migrated to.
+   * This is notable different than the RDD cache block migration (further 
down in this file)
+   * which uses the existing priority mechanism for determining where to 
replicate blocks to.
+   * Generally speaking cache blocks are less impactful as they normally 
represent narrow
+   * transformations and we 

[GitHub] [spark] HeartSaVioR commented on pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2020-06-18 Thread GitBox


HeartSaVioR commented on pull request #25135:
URL: https://github.com/apache/spark/pull/25135#issuecomment-646420436


   @Tagar 
   Please refer the comment in JIRA issue:
   
https://issues.apache.org/jira/browse/SPARK-28367?focusedCommentId=17088514=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17088514
   
   In short, it's OK to use the new API in executors (we may need to set longer 
timeout then), but still not be ideal to use it in driver. It requires Spark to 
implement the subscription modes by itself with AdminClient API, which is not 
impossible but brings complexity to match the behavior same as we expect from 
Kafka client.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of array

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646420162







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646420162







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of arr

2020-06-18 Thread GitBox


SparkQA removed a comment on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646349387


   **[Test build #124239 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124239/testReport)**
 for PR 28831 at commit 
[`22ab4ff`](https://github.com/apache/spark/commit/22ab4ff15cc8b2029d356d1be76e0e7e130438dd).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28831: [SPARK-31993][SQL] Build arrays for passing variables generated from children for 'concat_ws' with columns having at least one of array type

2020-06-18 Thread GitBox


SparkQA commented on pull request #28831:
URL: https://github.com/apache/spark/pull/28831#issuecomment-646419702


   **[Test build #124239 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124239/testReport)**
 for PR 28831 at commit 
[`22ab4ff`](https://github.com/apache/spark/commit/22ab4ff15cc8b2029d356d1be76e0e7e130438dd).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-18 Thread GitBox


agrawaldevesh commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r442615585



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,30 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.enabled")
+  .doc("Whether to transfer shuffle blocks during block manager 
decommissioning. Requires " +
+"a migratable shuffle resolver (like sort based shuffe)")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_MAX_THREADS =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxThreads")
+  .doc("Maximum number of threads to use in migrating shuffle files.")
+  .version("3.1.0")
+  .intConf
+  .checkValue(_ > 0, "The maximum number of threads should be positive")
+  .createWithDefault(10)

Review comment:
   Yep





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xuanyuanking commented on pull request #28854: [MINOR][DOCS] Emphasize the Streaming tab is for DStream API

2020-06-18 Thread GitBox


xuanyuanking commented on pull request #28854:
URL: https://github.com/apache/spark/pull/28854#issuecomment-646418641


   Thanks for reviewing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646417972


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/124255/
   Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646417341


   **[Test build #124255 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124255/testReport)**
 for PR 28866 at commit 
[`fe42fd5`](https://github.com/apache/spark/commit/fe42fd5e5ddbdf6ce744fe47d125eacf87f169f6).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646417965


   Merged build finished. Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646417965







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646417952


   **[Test build #124255 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124255/testReport)**
 for PR 28866 at commit 
[`fe42fd5`](https://github.com/apache/spark/commit/fe42fd5e5ddbdf6ce744fe47d125eacf87f169f6).
* This patch **fails build dependency tests**.
* This patch merges cleanly.
* This patch adds no public classes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646417639







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646417639







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646417341


   **[Test build #124255 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124255/testReport)**
 for PR 28866 at commit 
[`fe42fd5`](https://github.com/apache/spark/commit/fe42fd5e5ddbdf6ce744fe47d125eacf87f169f6).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] TJX2014 commented on a change in pull request #28856: [SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST

2020-06-18 Thread GitBox


TJX2014 commented on a change in pull request #28856:
URL: https://github.com/apache/spark/pull/28856#discussion_r442613764



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
##
@@ -2635,7 +2643,11 @@ object Sequence {
 var i = 0
 
 while (t < exclusiveItem ^ stepSign < 0) {
-  arr(i) = fromLong(t / scale)
+  arr(i) = if (scale == 1) {
+fromLong(t / scale)
+  } else {
+fromLong(Math.round(t / scale.toFloat))

Review comment:
   We could get date from `Math.round(t / scale.toFloat)`, if the target is 
timestamp,  the former ` fromLong(t / scale)` could be used.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646416246


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/124253/
   Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646416242


   Merged build finished. Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646416242







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646416228


   **[Test build #124253 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124253/testReport)**
 for PR 28866 at commit 
[`7e18dcc`](https://github.com/apache/spark/commit/7e18dcc52b1a2c7e53bad265768fa35ff3c5d28e).
* This patch **fails Scala style tests**.
* This patch merges cleanly.
* This patch adds no public classes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646415577


   **[Test build #124253 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124253/testReport)**
 for PR 28866 at commit 
[`7e18dcc`](https://github.com/apache/spark/commit/7e18dcc52b1a2c7e53bad265768fa35ff3c5d28e).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28856: [SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28856:
URL: https://github.com/apache/spark/pull/28856#issuecomment-646415868







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646415860







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28856: [SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28856:
URL: https://github.com/apache/spark/pull/28856#issuecomment-646415868







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646415860







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28856: [SPARK-31982][SQL]Function sequence doesn't handle date increments that cross DST

2020-06-18 Thread GitBox


SparkQA commented on pull request #28856:
URL: https://github.com/apache/spark/pull/28856#issuecomment-646415612


   **[Test build #124254 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124254/testReport)**
 for PR 28856 at commit 
[`b33514f`](https://github.com/apache/spark/commit/b33514fcb3ff536887ca1b8824de7481e875911d).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646415577


   **[Test build #124253 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124253/testReport)**
 for PR 28866 at commit 
[`7e18dcc`](https://github.com/apache/spark/commit/7e18dcc52b1a2c7e53bad265768fa35ff3c5d28e).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wankunde commented on pull request #28857: [SPARK-32023][Streaming]Generate spark streaming test jar with maven plugin maven-jar-plugin

2020-06-18 Thread GitBox


wankunde commented on pull request #28857:
URL: https://github.com/apache/spark/pull/28857#issuecomment-646414088


   Hi @srowen, I have compared the compilation results with and without shade 
plugin. Their result classes are the same. So I don't think we need to keep 
shade plugin anymore.
   
   ```
   ➜  tmp ls -al shade
   total 11864
   drwxr-xr-x  21 wankun  staff  672  6 19 11:11 .
   drwxr-xr-x   9 wankun  staff  288  6 19 11:15 ..
   -rw-r--r--   1 wankun  staff   30  6 19 11:11 .plxarc
   drwxr-xr-x   4 wankun  staff  128  6 19 11:11 analysis
   drwxr-xr-x   3 wankun  staff   96  6 19 11:11 antrun
   -rw-r--r--   1 wankun  staff 2103  6 19 11:11 checkstyle-cachefile
   -rw-r--r--   1 wankun  staff 8225  6 19 11:11 checkstyle-checker.xml
   -rw-r--r--   1 wankun  staff 2133  6 19 11:11 checkstyle-output.xml
   drwxr-xr-x   3 wankun  staff   96  6 19 11:11 maven-archiver
   drwxr-xr-x   3 wankun  staff   96  6 19 11:11 
maven-shared-archive-resources
   -rw-r--r--   1 wankun  staff   849617  6 19 11:11 
original-spark-streaming_2.12-3.0.0-tests.jar
   -rw-r--r--   1 wankun  staff  1129042  6 19 11:11 
original-spark-streaming_2.12-3.0.0.jar
   drwxr-xr-x   4 wankun  staff  128  6 19 11:11 scala-2.12
   -rw-r--r--   1 wankun  staff   91  6 19 11:11 scalastyle-output.xml
   drwxr-xr-x   3 wankun  staff   96  6 19 11:11 site
   -rw-r--r--   1 wankun  staff  1640857  6 19 11:11 
spark-streaming_2.12-3.0.0-javadoc.jar
   -rw-r--r--   1 wankun  staff   252544  6 19 11:11 
spark-streaming_2.12-3.0.0-sources.jar
   -rw-r--r--   1 wankun  staff   170013  6 19 11:11 
spark-streaming_2.12-3.0.0-test-sources.jar
   -rw-r--r--   1 wankun  staff   854204  6 19 11:11 
spark-streaming_2.12-3.0.0-tests.jar
   -rw-r--r--   1 wankun  staff  1137942  6 19 11:11 
spark-streaming_2.12-3.0.0.jar
   drwxr-xr-x   2 wankun  staff   64  6 19 11:11 tmp
   ➜  tmp ls -al without_shade
   total 10192
   drwxr-xr-x  20 wankun  staff  640  6 19 11:13 .
   drwxr-xr-x   9 wankun  staff  288  6 19 11:15 ..
   -rw-r--r--   1 wankun  staff   30  6 19 11:13 .plxarc
   drwxr-xr-x   4 wankun  staff  128  6 19 11:13 analysis
   drwxr-xr-x   3 wankun  staff   96  6 19 11:13 antrun
   -rw-r--r--   1 wankun  staff 2103  6 19 11:13 checkstyle-cachefile
   -rw-r--r--   1 wankun  staff 8225  6 19 11:13 checkstyle-checker.xml
   -rw-r--r--   1 wankun  staff 2133  6 19 11:13 checkstyle-output.xml
   drwxr-xr-x   3 wankun  staff   96  6 19 11:13 maven-archiver
   drwxr-xr-x   3 wankun  staff   96  6 19 11:13 
maven-shared-archive-resources
   -rw-r--r--   1 wankun  staff  1129002  6 19 11:13 
original-spark-streaming_2.12-3.0.0.jar
   drwxr-xr-x   4 wankun  staff  128  6 19 11:13 scala-2.12
   -rw-r--r--   1 wankun  staff   91  6 19 11:13 scalastyle-output.xml
   drwxr-xr-x   3 wankun  staff   96  6 19 11:13 site
   -rw-r--r--   1 wankun  staff  1640857  6 19 11:13 
spark-streaming_2.12-3.0.0-javadoc.jar
   -rw-r--r--   1 wankun  staff   252504  6 19 11:13 
spark-streaming_2.12-3.0.0-sources.jar
   -rw-r--r--   1 wankun  staff   169973  6 19 11:13 
spark-streaming_2.12-3.0.0-test-sources.jar
   -rw-r--r--   1 wankun  staff   849577  6 19 11:13 
spark-streaming_2.12-3.0.0-tests.jar
   -rw-r--r--   1 wankun  staff  1137902  6 19 11:13 
spark-streaming_2.12-3.0.0.jar
   drwxr-xr-x   2 wankun  staff   64  6 19 11:13 tmp
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-18 Thread GitBox


holdenk commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r442604350



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -775,7 +802,12 @@ private[spark] class MapOutputTrackerMaster(
   override def stop(): Unit = {
 mapOutputRequests.offer(PoisonPill)
 threadpool.shutdown()
-sendTracker(StopMapOutputTracker)
+try {

Review comment:
   So this is only because we call shutdown during the tests that this is 
needed. It doesn't throw exceptions normally.

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,30 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.enabled")
+  .doc("Whether to transfer shuffle blocks during block manager 
decommissioning. Requires " +
+"a migratable shuffle resolver (like sort based shuffe)")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_MAX_THREADS =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxThreads")
+  .doc("Maximum number of threads to use in migrating shuffle files.")
+  .version("3.1.0")
+  .intConf
+  .checkValue(_ > 0, "The maximum number of threads should be positive")
+  .createWithDefault(10)

Review comment:
   Sure, why not. I'll change this to default to 
`SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS` if it's not configured. Sound good?

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,30 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =

Review comment:
   Sure sounds good, I'll rename it.

##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -650,6 +658,23 @@ private[spark] class BlockManager(
   blockId: BlockId,
   level: StorageLevel,
   classTag: ClassTag[_]): StreamCallbackWithID = {
+
+if (decommissioner.isDefined) {
+   throw new BlockSavedOnDecommissionedBlockManagerException(blockId)
+}
+
+if (blockId.isShuffle || blockId.isInternalShuffle) {
+  logInfo(s"Putting shuffle block ${blockId}")
+  try {
+return migratableResolver.putShuffleBlockAsStream(blockId, 
serializerManager)
+  } catch {
+case e: ClassCastException => throw new SparkException(

Review comment:
   Yeah were logging which kind of block we received. We only really care 
about the type but it's easier to just print the blockId its self.

##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Class to handle block manager decommissioning retries.
+ * It creates a Thread to retry offloading all RDD cache and Shuffle blocks
+ */
+private[storage] class BlockManagerDecommissioner(
+  conf: SparkConf,
+  bm: BlockManager) extends Logging {
+
+  private val maxReplicationFailuresForDecommission =
+conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+  /**
+   * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
+   * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
+   * periodically. On migration failure, the current thread will reinsert the 
block for another
+   * thread to consume. Each 

[GitHub] [spark] Ngone51 commented on a change in pull request #28848: [SPARK-32003][CORE] Unregister outputs for executor on fetch failure …

2020-06-18 Thread GitBox


Ngone51 commented on a change in pull request #28848:
URL: https://github.com/apache/spark/pull/28848#discussion_r442610030



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1939,24 +1941,23 @@ private[spark] class DAGScheduler(
   hostToUnregisterOutputs: Option[String],
   maybeEpoch: Option[Long] = None): Unit = {
 val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
+logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: 
$currentEpoch")
 if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
   failedEpoch(execId) = currentEpoch
-  logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
+  logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
   blockManagerMaster.removeExecutor(execId)
-  if (fileLost) {
-hostToUnregisterOutputs match {
-  case Some(host) =>
-logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, 
currentEpoch))
-mapOutputTracker.removeOutputsOnHost(host)
-  case None =>
-logInfo("Shuffle files lost for executor: %s (epoch 
%d)".format(execId, currentEpoch))
-mapOutputTracker.removeOutputsOnExecutor(execId)
-}
-clearCacheLocs()
-
-  } else {
-logDebug("Additional executor lost message for %s (epoch 
%d)".format(execId, currentEpoch))
+}
+if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) 
< currentEpoch)) {

Review comment:
   I see.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #28848: [SPARK-32003][CORE] Unregister outputs for executor on fetch failure …

2020-06-18 Thread GitBox


Ngone51 commented on a change in pull request #28848:
URL: https://github.com/apache/spark/pull/28848#discussion_r442610125



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1939,24 +1941,22 @@ private[spark] class DAGScheduler(
   hostToUnregisterOutputs: Option[String],
   maybeEpoch: Option[Long] = None): Unit = {
 val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
+logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: 
$currentEpoch")
 if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
   failedEpoch(execId) = currentEpoch
   logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
   blockManagerMaster.removeExecutor(execId)
-  if (fileLost) {
-hostToUnregisterOutputs match {
-  case Some(host) =>
-logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, 
currentEpoch))
-mapOutputTracker.removeOutputsOnHost(host)
-  case None =>
-logInfo("Shuffle files lost for executor: %s (epoch 
%d)".format(execId, currentEpoch))
-mapOutputTracker.removeOutputsOnExecutor(execId)
-}
-clearCacheLocs()
-
-  } else {
-logDebug("Additional executor lost message for %s (epoch 
%d)".format(execId, currentEpoch))
+}
+if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) 
< currentEpoch)) {

Review comment:
   I see.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #28854: [MINOR][DOCS] Emphasize the Streaming tab is for DStream API

2020-06-18 Thread GitBox


HyukjinKwon commented on pull request #28854:
URL: https://github.com/apache/spark/pull/28854#issuecomment-646412636


   Merged to master and branch-3.0



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #28854: [MINOR][DOCS] Emphasize the Streaming tab is for DStream API

2020-06-18 Thread GitBox


HyukjinKwon closed pull request #28854:
URL: https://github.com/apache/spark/pull/28854


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28860: [SPARK-32002][SQL]Support Extract valve from nested ArrayStruct

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28860:
URL: https://github.com/apache/spark/pull/28860#issuecomment-646410212







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28860: [SPARK-32002][SQL]Support Extract valve from nested ArrayStruct

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28860:
URL: https://github.com/apache/spark/pull/28860#issuecomment-646410212







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28860: [SPARK-32002][SQL]Support Extract valve from nested ArrayStruct

2020-06-18 Thread GitBox


SparkQA commented on pull request #28860:
URL: https://github.com/apache/spark/pull/28860#issuecomment-646409843


   **[Test build #124252 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124252/testReport)**
 for PR 28860 at commit 
[`400ad7d`](https://github.com/apache/spark/commit/400ad7d04d55af4a25d2b5c92061de3f0533eecf).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646404851


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/124251/
   Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646404216


   **[Test build #124251 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124251/testReport)**
 for PR 28866 at commit 
[`4a6f903`](https://github.com/apache/spark/commit/4a6f903897d28a3038918997e692410259a90ae3).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646404847


   Merged build finished. Test FAILed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646404847







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646404833


   **[Test build #124251 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124251/testReport)**
 for PR 28866 at commit 
[`4a6f903`](https://github.com/apache/spark/commit/4a6f903897d28a3038918997e692410259a90ae3).
* This patch **fails build dependency tests**.
* This patch merges cleanly.
* This patch adds no public classes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins removed a comment on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins removed a comment on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646404507







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] agrawaldevesh commented on a change in pull request #28708: [SPARK-20629][CORE][K8S] Copy shuffle data when nodes are being shutdown

2020-06-18 Thread GitBox


agrawaldevesh commented on a change in pull request #28708:
URL: https://github.com/apache/spark/pull/28708#discussion_r442581482



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -775,7 +802,12 @@ private[spark] class MapOutputTrackerMaster(
   override def stop(): Unit = {
 mapOutputRequests.offer(PoisonPill)
 threadpool.shutdown()
-sendTracker(StopMapOutputTracker)
+try {

Review comment:
   I am curious on why sendTracker didn't throw an exception before and now 
it does ? Did the migration cause this ?

##
File path: core/src/main/scala/org/apache/spark/SparkEnv.scala
##
@@ -367,7 +367,8 @@ object SparkEnv extends Logging {
 externalShuffleClient
   } else {
 None
-  }, blockManagerInfo)),
+  }, blockManagerInfo,
+  mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])),

Review comment:
   Should there be a `isDriver` check here ? I believe the 
MapOutputTrackerMaster is only available on the driver ?

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -420,6 +420,30 @@ package object config {
   .booleanConf
   .createWithDefault(false)
 
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_ENABLED =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.enabled")
+  .doc("Whether to transfer shuffle blocks during block manager 
decommissioning. Requires " +
+"a migratable shuffle resolver (like sort based shuffe)")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val STORAGE_SHUFFLE_DECOMMISSION_MAX_THREADS =
+ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxThreads")
+  .doc("Maximum number of threads to use in migrating shuffle files.")
+  .version("3.1.0")
+  .intConf
+  .checkValue(_ > 0, "The maximum number of threads should be positive")
+  .createWithDefault(10)

Review comment:
   There is precedence with SHUFFLE_MAPOUTPUT_DISPATCHER_NUM_THREADS to use 
8 threads. Should we keep the same magic constant for consistency ?

##
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##
@@ -33,9 +33,11 @@ import org.apache.spark.util.Utils
  * task ran on as well as the sizes of outputs for each reducer, for passing 
on to the reduce tasks.

Review comment:
   I think this comment above still refers to "block manager address that 
the task run on" ... it should be updated.

##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -148,6 +170,82 @@ private[spark] class IndexShuffleBlockResolver(
 }
   }
 
+  /**
+   * Write a provided shuffle block as a stream. Used for block migrations.
+   * ShuffleBlockBatchIds must contain the full range represented in the 
ShuffleIndexBlock.
+   * Requires the caller to delete any shuffle index blocks where the shuffle 
block fails to
+   * put.
+   */
+  override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: 
SerializerManager):
+  StreamCallbackWithID = {
+val file = blockId match {
+  case ShuffleIndexBlockId(shuffleId, mapId, _) =>
+getIndexFile(shuffleId, mapId)
+  case ShuffleDataBlockId(shuffleId, mapId, _) =>
+getDataFile(shuffleId, mapId)
+  case _ =>
+throw new Exception(s"Unexpected shuffle block transfer ${blockId} as 
" +

Review comment:
   How about a more informative unchecked exception type like 
IllegalStateException ?

##
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##
@@ -40,6 +40,9 @@ sealed abstract class BlockId {
   def isRDD: Boolean = isInstanceOf[RDDBlockId]
   def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] || 
isInstanceOf[ShuffleBlockBatchId]
   def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
+  def isInternalShuffle: Boolean = {

Review comment:
   I kind of think that the name isInternalShuffle is a bit funny :-). 
What's internal about it ? Its more about being a data or an index block ? And 
in general shuffles are always internal (since they are within the same job) 
unlike rdd blocks that can be shared across jobs. 
   
   One way to work around this might be to inline this method. Or I wonder what 
happen if you change isShuffle to also include shuffle data and index blocks ?

##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may 

[GitHub] [spark] AmplabJenkins commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


AmplabJenkins commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646404507







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SparkQA commented on pull request #28866: [SPARK-31845][CORE][TESTS] DAGSchedulerSuite: Reuse completeNextStageWithFetchFailure

2020-06-18 Thread GitBox


SparkQA commented on pull request #28866:
URL: https://github.com/apache/spark/pull/28866#issuecomment-646404216


   **[Test build #124251 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/124251/testReport)**
 for PR 28866 at commit 
[`4a6f903`](https://github.com/apache/spark/commit/4a6f903897d28a3038918997e692410259a90ae3).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   >