[GitHub] spark issue #20825: add impurity stats in tree leaf node debug string

2018-03-15 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/20825
  
cc @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20825: add impurity stats in tree leaf node debug string

2018-03-14 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/20825
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-10 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/20561
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167300330
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
 ---
@@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends 
SparkFunSuite with SharedSQLContext {
   spill = true
 )
   }
+
+  test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap 
having duplicated keys") {
+val memoryManager = new TestMemoryManager(new SparkConf())
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
+val map = new BytesToBytesMap(taskMemoryManager, 64, 
taskMemoryManager.pageSizeBytes())
+
+// Key/value are a unsafe rows with a single int column
+val schema = new StructType().add("i", IntegerType)
+val key = new UnsafeRow(1)
+key.pointTo(new Array[Byte](32), 32)
+key.setInt(0, 1)
+val value = new UnsafeRow(1)
+value.pointTo(new Array[Byte](32), 32)
+value.setInt(0, 2)
+
+for (_ <- 1 to 65) {
+  val loc = map.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes)
+  loc.append(
+key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
+value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
+}
+
+// Make sure we can successfully create a UnsafeKVExternalSorter with 
a `BytesToBytesMap`
+// which has duplicated keys and the number of entries exceeds its 
capacity.
--- End diff --

For aggregation, there are no multiple entries for same key, that only 
happen for hash join (Don't remember the details)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167299807
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
+  // can have duplicated keys, here we need a check to make sure the 
point array can hold
+  // all the entries in `BytesToBytesMap`.
+  final LongArray pointArray;
+  // The point array will be used to do in-place sort, which require 
half of the space to be
+  // empty. Note: each record in the map takes two entries in the 
point array, one is record
+  // pointer, another is the key prefix.
+  if (map.numValues() > map.getArray().size() / 4) {
+pointArray = map.allocateArray(map.numValues() * 4);
--- End diff --

The allocation may fail.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167299716
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
--- End diff --

It's possible to change UnsafeInMemorySorter to have multiple entries with 
same key.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18052: [SPARK-20347][PYSPARK][WIP] Provide AsyncRDDActions in P...

2017-06-20 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/18052
  
Personally, I think less is more, don't add everything into every software, 
otherwise every software can write email eventually. 

The RDD API is kind of frozen, we don't add more APIs into it if it's not 
necessary. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18052: [SPARK-20347][PYSPARK][WIP] Provide AsyncRDDActions in P...

2017-06-20 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/18052
  
It seems that it's also easy to implement these outside of PySpark by user 
themselves or third-party libraries, right? If that's the case, I'd like not to 
add it into PySpark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18244: [SPARK-20211][SQL] Fix the Precision and Scale of Decima...

2017-06-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/18244
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18244: [SPARK-20211][SQL] Fix the Precision and Scale of...

2017-06-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/18244#discussion_r121050241
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala ---
@@ -126,7 +126,15 @@ final class Decimal extends Ordered[Decimal] with 
Serializable {
   def set(decimal: BigDecimal): Decimal = {
 this.decimalVal = decimal
 this.longVal = 0L
-this._precision = decimal.precision
+if (decimal.compare(BigDecimal(1.0)) == -1 && 
decimal.compare(BigDecimal(-1.0)) == 1) {
--- End diff --

if (decimal.presision <= decimal.scale) {
}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17375: [SPARK-19019][PYTHON][BRANCH-1.6] Fix hijacked `collecti...

2017-03-21 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/17375
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17374: [SPARK-19019][PYTHON][BRANCH-2.0] Fix hijacked `collecti...

2017-03-21 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/17374
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17282: [SPARK-19872][PYTHON] Use the correct deserializer for R...

2017-03-15 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/17282
  
lgtm, merging into master, and 2.1, 2.0 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105007318
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 ---
@@ -341,25 +364,27 @@ private[window] final class 
UnboundedFollowingWindowFunctionFrame(
   override def write(index: Int, current: InternalRow): Unit = {
 var bufferUpdated = index == 0
 
-// Duplicate the input to have a new iterator
-val tmp = input.copy()
-
-// Drop all rows from the buffer for which the input row value is 
smaller than
+// Ignore all the rows from the buffer for which the input row value 
is smaller than
 // the output row lower bound.
-tmp.skip(inputIndex)
-var nextRow = tmp.next()
+val iterator = input.generateIterator(startIndex = inputIndex)
+
+def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = {
--- End diff --

If we expect a row somewhere, I'd like to call next(), it will throw an 
exception if something goes wrong, otherwise you will silient got wrong result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105006782
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 ---
@@ -164,9 +176,12 @@ private[window] final class SlidingWindowFunctionFrame(
   private[this] var inputLowIndex = 0
 
   /** Prepare the frame for calculating a new partition. Reset all 
variables. */
-  override def prepare(rows: RowBuffer): Unit = {
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
-nextRow = rows.next()
+inputIterator = input.generateIterator()
+if (inputIterator.hasNext) {
--- End diff --

Should this be a assert? Or you may suddenly have a wrong result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-03-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r105006306
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -674,6 +675,24 @@ object SQLConf {
   .stringConf
   .createWithDefault(TimeZone.getDefault().getID())
 
+  val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.windowExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in window operator")
+  .intConf
+  .createWithDefault(4096)
+
+  val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in sort merge join 
operator")
+  .intConf
+  .createWithDefault(Int.MaxValue)
+
+  val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
+buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
+  .doc("Threshold for number of rows buffered in cartesian product 
operator")
+  .intConf
+  
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
--- End diff --

There are too many nobs, should we mark them as internal?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16896: [SPARK-19561][Python] cast TimestampType.toInternal outp...

2017-03-07 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16896
  
My bad, did not realized that, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16896: [SPARK-19561][Python] cast TimestampType.toInternal outp...

2017-03-07 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16896
  
Merged into master and 2.1 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16896: [SPARK-19561][Python] cast TimestampType.toInternal outp...

2017-03-06 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16896
  
lgtm, will merge it when I get a chance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16782: [SPARK-19348][PYTHON][WIP] PySpark keyword_only d...

2017-02-27 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16782#discussion_r103276349
  
--- Diff: python/pyspark/__init__.py ---
@@ -96,9 +96,11 @@ def keyword_only(func):
 """
 @wraps(func)
 def wrapper(*args, **kwargs):
+# NOTE - this assumes we are wrapping a method and args[0] will be 
'self'
 if len(args) > 1:
 raise TypeError("Method %s forces keyword arguments." % 
func.__name__)
 wrapper._input_kwargs = kwargs
--- End diff --

If the assumption is correct, should we always use 'self' to hold the 
kwargs? (remove this line and update all the fuctions that use `keyword_only`)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17036: [SPARK-19706][pyspark] add Column.contains in pyspark

2017-02-23 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/17036
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17036: [SPARK-19706][pyspark] add Column.contains in pyspark

2017-02-22 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/17036
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16865: [SPARK-19530][SQL] Use guava weigher for code cache evic...

2017-02-19 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16865
  
I still think it's not worth it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMa...

2017-02-17 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16844
  
Merging into master, 2.1, 2.0 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...

2017-02-15 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16844#discussion_r101360777
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -742,7 +742,7 @@ public boolean append(Object kbase, long koff, int 
klen, Object vbase, long voff
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
 
-if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
+if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) 
{
--- End diff --

Unfortunately, we can't grow in the beginning, otherwise the `pos` will be 
wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...

2017-02-14 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16909#discussion_r101214678
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, 
UnsafeSorterIterator}
+
+/**
+ * An append-only array for [[UnsafeRow]]s that spills content to disk 
when there a predefined
+ * threshold of rows is reached.
+ *
+ * Setting spill threshold faces following trade-off:
+ *
+ * - If the spill threshold is too high, the in-memory array may occupy 
more memory than is
+ *   available, resulting in OOM.
+ * - If the spill threshold is too low, we spill frequently and incur 
unnecessary disk writes.
+ *   This may lead to a performance regression compared to the normal case 
of using an
+ *   [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: 
Int) extends Logging {
--- End diff --

How does this compare to use ExternalUnsafeSorter directly? There is a 
similar use case here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala#L42


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16896: [SPARK-19561][Python] cast TimestampType.toInternal outp...

2017-02-14 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16896
  
Just one minor comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16896: [SPARK-19561][Python] cast TimestampType.toIntern...

2017-02-14 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16896#discussion_r101204910
  
--- Diff: python/pyspark/sql/types.py ---
@@ -189,7 +189,7 @@ def toInternal(self, dt):
 if dt is not None:
 seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
-return int(seconds) * 100 + dt.microsecond
+return long(int(seconds) * 100 + dt.microsecond)
--- End diff --

Could you just replace the `int` as `long`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16865: [SPARK-19530][SQL] Use guava weigher for code cache evic...

2017-02-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16865
  
@viirya This is a general OOM, should not be caused by cached bytecode, 
they are way smaller comparing other things in executor, I think this patch 
will not help either.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16865: [SPARK-19530][SQL] Use guava weigher for code cache evic...

2017-02-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16865
  
I understand the motivation here, could you show the benefit of this change 
for a real use case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16865: [SPARK-19530][SQL] Use guava weigher for code cac...

2017-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16865#discussion_r100428764
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -1004,7 +1016,8 @@ object CodeGenerator extends Logging {
* weak keys/values and thus does not respond to memory pressure.
*/
   private val cache = CacheBuilder.newBuilder()
-.maximumSize(100)
+.maximumWeight(10 * 1024 * 1024)
--- End diff --

@dongjoon-hyun That's a limit on single Java method, not a whole class. A 
generated class could be way bigger than 64KB.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...

2017-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16844#discussion_r100387863
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -695,11 +690,16 @@ public boolean append(Object kbase, long koff, int 
klen, Object vbase, long voff
   assert (vlen % 8 == 0);
   assert (longArray != null);
 
-  if (numKeys == MAX_CAPACITY
-// The map could be reused from last spill (because of no enough 
memory to grow),
-// then we don't try to grow again if hit the `growthThreshold`.
-|| !canGrowArray && numKeys > growthThreshold) {
-return false;
+  if (numKeys >= growthThreshold) {
+if (longArray.size() / 2 == MAX_CAPACITY) {
--- End diff --

@mridulm Do that means we should also rename the `growAndRehash` to 
`tryGrowAndRehash`? I think those are not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16825: [SPARK-19481][REPL][maven]Avoid to leak SparkContext in ...

2017-02-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16825
  
lgtm, merging this into master and 2.1 branch, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...

2017-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16844#discussion_r100383151
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -741,14 +741,6 @@ public boolean append(Object kbase, long koff, int 
klen, Object vbase, long voff
 numKeys++;
 longArray.set(pos * 2 + 1, keyHashcode);
 isDefined = true;
-
-if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
--- End diff --

longArray.size() is the next capacity for current grow strategy, it should 
be `longArray.size() <= MAX_CAPACITY`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...

2017-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16844#discussion_r100381544
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -695,11 +690,16 @@ public boolean append(Object kbase, long koff, int 
klen, Object vbase, long voff
   assert (vlen % 8 == 0);
   assert (longArray != null);
 
-  if (numKeys == MAX_CAPACITY
-// The map could be reused from last spill (because of no enough 
memory to grow),
-// then we don't try to grow again if hit the `growthThreshold`.
-|| !canGrowArray && numKeys > growthThreshold) {
-return false;
+  if (numKeys >= growthThreshold) {
+if (longArray.size() / 2 == MAX_CAPACITY) {
--- End diff --

There are two reason it will fail to grow: 1) current capacity 
(longArray.size() / 2) reach MAX_CAPACITY  2) can't allocate a array (OOM).

So, I think the checking here is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16825: [SPARK-19481][REPL][maven]Avoid to leak SparkCont...

2017-02-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16825#discussion_r100206125
  
--- Diff: repl/src/main/scala/org/apache/spark/repl/Signaling.scala ---
@@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging {
* when no jobs are currently running.
* This makes it possible to interrupt a running shell job by pressing 
Ctrl+C.
*/
-  def cancelOnInterrupt(ctx: SparkContext): Unit = 
SignalUtils.register("INT") {
-if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
-  logWarning("Cancelling all active jobs, this can take a while. " +
-"Press Ctrl+C again to exit now.")
-  ctx.cancelAllJobs()
-  true
-} else {
-  false
-}
+  def cancelOnInterrupt(): Unit = SignalUtils.register("INT") {
--- End diff --

Who is using this one? Is this a breaking change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMa...

2017-02-08 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16844
  
@viirya Addressed your comment, also fixed another bug (updated PR 
description).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMa...

2017-02-07 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16844
  
cc @joshrosen, @viirya 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...

2017-02-07 Thread davies
GitHub user davies opened a pull request:

https://github.com/apache/spark/pull/16844

[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap

## What changes were proposed in this pull request?

Radix sort require that half of array as free (as temporary space), so we 
use 0.5 as the scale factor to make sure that BytesToBytesMap will not have 
more items than 1/2 of capacity. Turned out this is not true, the current 
implementation of append() could leave 1 more item than the threshold (1/2 of 
capacity) in the array, which break the requirement of radix sort (fail the 
assert in 2.2, or fail to insert into InMemorySorter in 2.1).

This PR fix the off-by-one bug in BytesToBytesMap.

## How was this patch tested?

Added regression test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davies/spark off_by_one

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16844






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13808: [SPARK-14480][SQL] Remove meaningless StringIteratorRead...

2017-01-26 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/13808
  
@HyukjinKwon @rxin This patch have a regression: A column that have escaped 
newline can't be correctly parsed anymore. Should we revert this patch or 
figure a way to fix that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16581: [SPARK-18589] [SQL] Fix Python UDF accessing attributes ...

2017-01-20 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16581
  
Cherry-picked into 2.1 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15467: [SPARK-17912][SQL] Refactor code generation to get data ...

2017-01-19 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15467
  
Merging this into master, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16581: [SPARK-18589] [SQL] Fix Python UDF accessing attr...

2017-01-17 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16581#discussion_r96469102
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -342,6 +342,15 @@ def test_udf_in_filter_on_top_of_outer_join(self):
 df = df.withColumn('b', udf(lambda x: 'x')(df.a))
 self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')])
 
+def test_udf_in_filter_on_top_of_join(self):
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16429: [SPARK-19019][PYTHON] Fix hijacked `collections.namedtup...

2017-01-17 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16429
  
lgtm, merging into master and 2.1 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16429: [SPARK-19019][PYTHON] Fix hijacked `collections.n...

2017-01-14 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16429#discussion_r96130191
  
--- Diff: python/pyspark/serializers.py ---
@@ -382,18 +382,30 @@ def _hijack_namedtuple():
 return
 
 global _old_namedtuple  # or it will put in closure
+global _old_namedtuple_kwdefaults  # or it will put in closure too
 
 def _copy_func(f):
 return types.FunctionType(f.__code__, f.__globals__, f.__name__,
   f.__defaults__, f.__closure__)
 
+def _kwdefaults(f):
+kargs = getattr(f, "__kwdefaults__", None)
--- End diff --

Could you put this comment into code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16555: [SPARK-19180][SQL] the offset of short should be 2 in Of...

2017-01-13 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16555
  
lgtm, merging it into master, 2.1 and 2.0 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16581: [SPARK-18589] [SQL] Fix Python UDF accessing attributes ...

2017-01-13 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16581
  
cc @hvanhovell 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16581: [SPARK-18589] [SQL] Fix Python UDF accessing attr...

2017-01-13 Thread davies
GitHub user davies opened a pull request:

https://github.com/apache/spark/pull/16581

[SPARK-18589] [SQL] Fix Python UDF accessing attributes from both side of 
join

## What changes were proposed in this pull request?

PythonUDF is unevaluable, which can not be used inside a join condition, 
currently the optimizer will push a PythonUDF which accessing both side of join 
into the join condition, then the query will fail to plan.

This PR fix this issue by checking the expression is evaluable  or not 
before pushing it into Join.

## How was this patch tested?

Add a regression test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davies/spark pyudf_join

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16581.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16581


commit 95d73fcf911bfb25b20fea798d1f7b3f4b319e26
Author: Davies Liu <dav...@databricks.com>
Date:   2017-01-13T19:50:11Z

Fix Python UDF accessing attributes from both side of join




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14452: [SPARK-16849][SQL] Improve subquery execution by dedupli...

2016-12-24 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/14452
  
@viirya For duplicated CTE, without some optimization (pushing down 
different predicates in different positions), the physical plan should be 
identical. So I'm wondering some aggressive pushing down cause the problem for 
some queries (IsNotNull(xxx)). This is the reason I asked that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-12-22 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r93671999
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -305,40 +316,84 @@ final class ShuffleBlockFetcherIterator(
*/
   override def next(): (BlockId, InputStream) = {
 numBlocksProcessed += 1
-val startFetchWait = System.currentTimeMillis()
-currentResult = results.take()
-val result = currentResult
-val stopFetchWait = System.currentTimeMillis()
-shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
-
-result match {
-  case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) =>
-if (address != blockManager.blockManagerId) {
-  shuffleMetrics.incRemoteBytesRead(buf.size)
-  shuffleMetrics.incRemoteBlocksFetched(1)
-}
-bytesInFlight -= size
-if (isNetworkReqDone) {
-  reqsInFlight -= 1
-  logDebug("Number of requests in flight " + reqsInFlight)
-}
-  case _ =>
-}
-// Send fetch requests up to maxBytesInFlight
-fetchUpToMaxBytes()
 
-result match {
-  case FailureFetchResult(blockId, address, e) =>
-throwFetchFailedException(blockId, address, e)
+var result: FetchResult = null
+var input: InputStream = null
+// Take the next fetched result and try to decompress it to detect 
data corruption,
+// then fetch it one more time if it's corrupt, throw 
FailureFetchResult if the second fetch
--- End diff --

@fathersson The checksum in TCP is only 16 bits,  it's not strong enough 
for large traffic, usually DFS or other system with heavy TCP traffic will have 
another application level checksum. Adding to @rxin 's point, we did see this 
retry helped in production to work around temporary corrupt. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16211: [SPARK-18576][PYTHON] Add basic TaskContext information ...

2016-12-20 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16211
  
Looks good to me in general, cc @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16263: [SPARK-18281][SQL][PySpark] Remove timeout for reading d...

2016-12-20 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16263
  
Merging this into master and 2.1 branch, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15980: [SPARK-18528][SQL] Fix a bug to initialise an iterator o...

2016-12-20 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15980
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16232: [SPARK-18800][SQL] Correct the assert in UnsafeKVExterna...

2016-12-20 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16232
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16232: [SPARK-18800][SQL] Fix UnsafeKVExternalSorter by correct...

2016-12-19 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16232
  
@viirya without a repro, I don't think this is the root cause. There could 
be a random corrupt that cause the error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16232: [SPARK-18800][SQL] Fix UnsafeKVExternalSorter by correct...

2016-12-19 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16232
  
That make sense, we should update the assert. But this still is not a bug, 
the other changes are not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16232: [SPARK-18800][SQL] Fix UnsafeKVExternalSorter by correct...

2016-12-19 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16232
  
@viirya  That's not correct, the values does not have entry in the array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16232: [SPARK-18800][SQL] Fix UnsafeKVExternalSorter by correct...

2016-12-19 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16232
  
@viirya A map with multiple values for same key, is only used for hashed 
relation, not aggregation, will also not passed into UnsafeKVExternalSorter. So 
I think this is not actually a bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16263: [SPARK-18281][SQL][PySpark] Consumes the returned...

2016-12-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16263#discussion_r93127142
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -558,6 +558,18 @@ def test_create_dataframe_from_objects(self):
 self.assertEqual(df.dtypes, [("key", "bigint"), ("value", 
"string")])
 self.assertEqual(df.first(), Row(key=1, value="1"))
 
+def test_to_localiterator_for_dataframe(self):
--- End diff --

This test is duplicated, I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16263: [SPARK-18281][SQL][PySpark] Consumes the returned...

2016-12-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16263#discussion_r93126957
  
--- Diff: python/pyspark/rdd.py ---
@@ -135,12 +135,12 @@ def _load_from_socket(port, serializer):
 break
 if not sock:
 raise Exception("could not open socket")
-try:
-rf = sock.makefile("rb", 65536)
-for item in serializer.load_stream(rf):
-yield item
-finally:
-sock.close()
+# The RDD materialization time is unpredicable, if we set a timeout 
for socket reading
+# operation, it will very possibly fail. See SPARK-18281.
+sock.settimeout(None)
+# The socket will be automatically closed when garbage-collected.
+rf = sock.makefile("rb", 65536)
+return serializer.load_stream(sock.makefile("rb", 65536))
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16263: [SPARK-18281][SQL][PySpark] Consumes the returned...

2016-12-16 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16263#discussion_r92833679
  
--- Diff: python/pyspark/rdd.py ---
@@ -2349,7 +2352,12 @@ def toLocalIterator(self):
 """
 with SCCallSiteSync(self.context) as css:
 port = 
self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd())
-return _load_from_socket(port, self._jrdd_deserializer)
+# We set a timeout for connecting socket. The connection only 
begins when we start
+# to consume the first element. If we do not begin to consume the 
returned iterator
+# immediately, there will be a failure.
+iter = _load_from_socket(port, self._jrdd_deserializer)
--- End diff --

I think we could have a wrapper to load items, and return the iterator in 
_load_from_socket(): 
```
def _load_from_socket(port):
 ...
return  serializer.load_stream(sock.makefile("rb", 65536))
```

The `finally` may be optional, GC cloud close the socket (should be 
confirmed).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16263: [SPARK-18281][SQL][PySpark] Consumes the returned...

2016-12-16 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16263#discussion_r92830957
  
--- Diff: python/pyspark/rdd.py ---
@@ -2349,7 +2352,12 @@ def toLocalIterator(self):
 """
 with SCCallSiteSync(self.context) as css:
 port = 
self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd())
-return _load_from_socket(port, self._jrdd_deserializer)
+# We set a timeout for connecting socket. The connection only 
begins when we start
+# to consume the first element. If we do not begin to consume the 
returned iterator
+# immediately, there will be a failure.
+iter = _load_from_socket(port, self._jrdd_deserializer)
--- End diff --

Since the timeout is removed, is this still needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16274: [SPARK-18853][SQL] Project (UnaryNode) is way too aggres...

2016-12-13 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16274
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...

2016-12-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16193
  
Pushing down predicates into data source is also during optimization in 
planner, I think this one is not the first that do optimization outside 
Optimizer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...

2016-12-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16193
  
The reason we move the PythonUDFEvaluator from logical plan into physical 
plan, because this one-off break many things, many rules need to treat 
specially.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...

2016-12-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16193
  
@cloud-fan It's not trivial to do this in optimizer, for example, we should 
split one Filter into two, that will conflict with another optimizer rule, that 
combine two filter into one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...

2016-12-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16193
  
If no objection in next two hours, I will merge this one into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...

2016-12-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16193
  
@cloud-fan There is no R UDF at this point. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16235: [SPARK-18745][SQL] Fix signed integer overflow due to to...

2016-12-09 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16235
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16193: [SPARK-18766] [SQL] Push Down Filter Through Batc...

2016-12-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16193#discussion_r91641264
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -165,4 +167,25 @@ object ExtractPythonUDFs extends Rule[SparkPlan] {
   }
 }
   }
+
+  // Split the original FilterExec to two FilterExecs. The upper 
FilterExec only contains
+  // Python UDF and non-deterministic predicates.
+  private def trySplitFilter(plan: SparkPlan): SparkPlan = {
+plan match {
+  case filter: FilterExec =>
+// Only push down the predicates that is deterministic and all the 
referenced attributes
+// come from child.
+val (candidates, containingNonDeterministic) =
+  
splitConjunctivePredicates(filter.condition).span(_.deterministic)
+// Python UDF is always deterministic
--- End diff --

Is this one useful? We just won't push down expressions that has Python 
UDFs here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16193: [SPARK-18766] [SQL] Push Down Filter Through Batc...

2016-12-08 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16193#discussion_r91609923
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -165,4 +167,31 @@ object ExtractPythonUDFs extends Rule[SparkPlan] {
   }
 }
   }
+
+  // Split the original FilterExec to two FilterExecs. The upper 
FilterExec only contains
+  // Python UDF and non-deterministic predicates.
+  private def trySplitFilter(plan: SparkPlan): SparkPlan = {
+plan match {
+  case filter: FilterExec =>
+// Only push down the predicates that is deterministic and all the 
referenced attributes
+// come from child.
+val (candidates, containingNonDeterministic) =
+  
splitConjunctivePredicates(filter.condition).span(_.deterministic)
+val (pushDown, rest) = candidates.partition(!hasPythonUDF(_))
+val stayUp = rest ++ containingNonDeterministic
+
+if (pushDown.nonEmpty) {
+  val newChild = FilterExec(pushDown.reduceLeft(And), filter.child)
+  if (stayUp.nonEmpty) {
--- End diff --

There are should be some UDFs, so this will not be empty


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16121: [SPARK-16589][PYTHON] Chained cartesian produces incorre...

2016-12-08 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16121
  
LGTM, merging into master and 2.1 branch, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15923: [SPARK-4105] retry the fetch or stage if shuffle block i...

2016-12-07 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15923
  
@JoshRosen Added a test for `detectCorrupt` is false.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16193: [SPARK-18766] [SQL] Push Down Filter Through Batc...

2016-12-07 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/16193#discussion_r91394979
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -166,3 +174,40 @@ object ExtractPythonUDFs extends Rule[SparkPlan] {
 }
   }
 }
+
+// This rule is to push deterministic predicates through 
BatchEvalPythonExec
+object PushPredicateThroughBatchEvalPython extends Rule[SparkPlan] with 
PredicateHelper {
--- End diff --

Having a predicate-pushdown rule for SparkPlan sounds bad, can we try to do 
this in extract()? for example
```
val splittedFilter = trySplitFilter(plan)
val newChildren = splittedFilter.children.map { child =>
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15923: [SPARK-4105] retry the fetch or stage if shuffle block i...

2016-12-06 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15923
  
ping @JoshRosen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16151: [SPARK-18719] Add spark.ui.showConsoleProgress to config...

2016-12-05 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16151
  
lgtm, merging into master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16121: [SPARK-16589][PYTHON] Chained cartesian produces incorre...

2016-12-02 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/16121
  
It's pretty tricky to make the chained CartesianDeserializer work, maybe 
it's easier to have a workaround in the RDD.cartesian() to add an 
_reserialize() between chained cartesian (or zipped), it will be less 
performant, but will be easy to reason about.

The current patch may still be wrong in case of chained 
DartesianDeserializer and PairSerializer, for example, a.cartesian(b.zip(c)) 
(have not verified yet)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-29 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r90085570
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -56,8 +59,10 @@ final class ShuffleBlockFetcherIterator(
 shuffleClient: ShuffleClient,
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-29 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r90085604
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator(
*/
   override def next(): (BlockId, InputStream) = {
 numBlocksProcessed += 1
-val startFetchWait = System.currentTimeMillis()
-currentResult = results.take()
-val result = currentResult
-val stopFetchWait = System.currentTimeMillis()
-shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
-
-result match {
-  case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) =>
-if (address != blockManager.blockManagerId) {
-  shuffleMetrics.incRemoteBytesRead(buf.size)
-  shuffleMetrics.incRemoteBlocksFetched(1)
-}
-bytesInFlight -= size
-if (isNetworkReqDone) {
-  reqsInFlight -= 1
-  logDebug("Number of requests in flight " + reqsInFlight)
-}
-  case _ =>
-}
-// Send fetch requests up to maxBytesInFlight
-fetchUpToMaxBytes()
 
-result match {
-  case FailureFetchResult(blockId, address, e) =>
-throwFetchFailedException(blockId, address, e)
+var result: FetchResult = null
+var input: InputStream = null
+// Take the next fetched result and try to decompress it to detect 
data corruption,
+// then fetch it one more time if it's corrupt, throw 
FailureFetchResult if the second fetch
+// is also corrupt, so the previous stage could be retried.
+// For local shuffle block, throw FailureFetchResult for the first 
IOException.
+while (result == null) {
+  val startFetchWait = System.currentTimeMillis()
+  result = results.take()
+  val stopFetchWait = System.currentTimeMillis()
+  shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
 
-  case SuccessFetchResult(blockId, address, _, buf, _) =>
-try {
-  (result.blockId, new 
BufferReleasingInputStream(buf.createInputStream(), this))
-} catch {
-  case NonFatal(t) =>
-throwFetchFailedException(blockId, address, t)
-}
+  result match {
+case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
+  if (address != blockManager.blockManagerId) {
+shuffleMetrics.incRemoteBytesRead(buf.size)
+shuffleMetrics.incRemoteBlocksFetched(1)
+  }
+  bytesInFlight -= size
+  if (isNetworkReqDone) {
+reqsInFlight -= 1
+logDebug("Number of requests in flight " + reqsInFlight)
+  }
+
+  val in = try {
+buf.createInputStream()
+  } catch {
+// The exception could only be throwed by local shuffle block
+case e: IOException =>
+  assert(buf.isInstanceOf[FileSegmentManagedBuffer])
+  logError("Failed to create input stream from local block", e)
+  buf.release()
+  throwFetchFailedException(blockId, address, e)
+  }
+
+  input = streamWrapper(blockId, in)
+  // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
+  // block is small (the decompressed block is smaller than 
maxBytesInFlight)
+  if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 
3) {
+val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
+try {
+  // Decompress the whole block at once to detect any 
corruption, which could increase
+  // the memory usage tne potential increase the chance of OOM.
+  // TODO: manage the memory used here, and spill it into disk 
in case of OOM.
+  Utils.copyStream(input, out)
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15935: [SPARK-18188] add checksum for blocks of broadcast

2016-11-28 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15935
  
@zsxwing Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15935: [SPARK-18188] add checksum for blocks of broadcast

2016-11-28 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15935
  
Manually test this patch with a job that usually failed with corrupt 
streams:
```
136 26993   0   FAILED  PROCESS_LOCAL   0 / 10.1.108.161
2016/11/20 08:39:11 7 s 98 ms   0 ms30.1 MB / 14481129.2 MB 
java.io.IOException: org.apache.spark.SparkException: corrupt remote block 9: 
927759120 != 787315496 +details
java.io.IOException: org.apache.spark.SparkException: corrupt remote block 
9: 927759120 != 787315496
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1283)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:192)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:93)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.init(Unknown
 Source)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:367)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:364)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: corrupt remote block 9: 
927759120 != 787315496
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:154)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:139)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:139)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:139)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:204)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
... 25 more
136 27059   1   SUCCESS PROCESS_LOCAL   13 / 10.1.109.49
2016/11/20 08:39:23 19 s0.4 s   1 ms30.1 MB / 14481128.9 MB 
```

This patch did save a stage from a corrupt block of broadcast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-28 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r89889964
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator(
*/
   override def next(): (BlockId, InputStream) = {
 numBlocksProcessed += 1
-val startFetchWait = System.currentTimeMillis()
-currentResult = results.take()
-val result = currentResult
-val stopFetchWait = System.currentTimeMillis()
-shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
-
-result match {
-  case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) =>
-if (address != blockManager.blockManagerId) {
-  shuffleMetrics.incRemoteBytesRead(buf.size)
-  shuffleMetrics.incRemoteBlocksFetched(1)
-}
-bytesInFlight -= size
-if (isNetworkReqDone) {
-  reqsInFlight -= 1
-  logDebug("Number of requests in flight " + reqsInFlight)
-}
-  case _ =>
-}
-// Send fetch requests up to maxBytesInFlight
-fetchUpToMaxBytes()
 
-result match {
-  case FailureFetchResult(blockId, address, e) =>
-throwFetchFailedException(blockId, address, e)
+var result: FetchResult = null
+var input: InputStream = null
+// Take the next fetched result and try to decompress it to detect 
data corruption,
+// then fetch it one more time if it's corrupt, throw 
FailureFetchResult if the second fetch
+// is also corrupt, so the previous stage could be retried.
+// For local shuffle block, throw FailureFetchResult for the first 
IOException.
+while (result == null) {
+  val startFetchWait = System.currentTimeMillis()
+  result = results.take()
+  val stopFetchWait = System.currentTimeMillis()
+  shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
 
-  case SuccessFetchResult(blockId, address, _, buf, _) =>
-try {
-  (result.blockId, new 
BufferReleasingInputStream(buf.createInputStream(), this))
-} catch {
-  case NonFatal(t) =>
-throwFetchFailedException(blockId, address, t)
-}
+  result match {
+case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
+  if (address != blockManager.blockManagerId) {
+shuffleMetrics.incRemoteBytesRead(buf.size)
+shuffleMetrics.incRemoteBlocksFetched(1)
+  }
+  bytesInFlight -= size
+  if (isNetworkReqDone) {
+reqsInFlight -= 1
+logDebug("Number of requests in flight " + reqsInFlight)
+  }
+
+  val in = try {
+buf.createInputStream()
+  } catch {
+// The exception could only be throwed by local shuffle block
+case e: IOException =>
+  assert(buf.isInstanceOf[FileSegmentManagedBuffer])
+  logError("Failed to create input stream from local block", e)
+  buf.release()
+  throwFetchFailedException(blockId, address, e)
+  }
+
+  input = streamWrapper(blockId, in)
+  // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
+  // block is small (the decompressed block is smaller than 
maxBytesInFlight)
--- End diff --

@JoshRosen  I think so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15923: [SPARK-4105] retry the fetch or stage if shuffle block i...

2016-11-28 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15923
  
Manually test this patch with a job that usually failed because of corrupt 
stream, as the logging said:
```
16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_613_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again
16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_688_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again
16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_2434_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again
16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_878_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_1042_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_2301_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_2546_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:27 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_3160_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
16/11/20 08:32:27 WARN ShuffleBlockFetcherIterator: got an corrupted block 
shuffle_5_3601_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again
...
16/11/20 08:32:41 INFO Executor: Finished task 275.0 in stage 26.0 (TID 
22187). 5219 bytes result sent to driver
```
The shuffle fetcher got some corrupt blocks for partition 275, it retried 
once, then the task finally succeeded.

But the retry can not protect all the tasks, some failed as FetchFailed, 
then the stage is retried:
```
26 2016/11/20 08:31:24  1.0min  403/1000 (2 failed) 
205.6 GB29.5 GB org.apache.spark.shuffle.FetchFailedException: Stream 
is corrupted

26 (retry 1)  2016/11/20 08:34:00   34s 200/629 (2 failed)  
102.0 GB14.6 GB org.apache.spark.shuffle.FetchFailedException: 
Stream is corrupted

26 (retry 2)  2016/11/20 08:35:25   1.8min  461/461 235.1 
GB33.7 GB
```

The stage 26 succeeded after retried twice.

Another thing is that all the corruption happened only in 2 nodes out of 
26. Also a few broadcast block is corrupt on them. They seems that the 
corruption happens on the receive (fetcher) side of network.

I will update the patch to address comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15980: [SPARK-18528][SQL] Fix a bug to initialise an ite...

2016-11-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15980#discussion_r89365636
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -67,23 +67,14 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-val stopEarly = ctx.freshName("stopEarly")
-ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;")
-
-ctx.addNewFunction("shouldStop", s"""
-  @Override
-  protected boolean shouldStop() {
-return !currentRows.isEmpty() || $stopEarly;
-  }
-""")
 val countTerm = ctx.freshName("count")
 ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
 s"""
| if ($countTerm < $limit) {
|   $countTerm += 1;
|   ${consume(ctx, input)}
| } else {
-   |   $stopEarly = true;
+   |   // Do nothing
--- End diff --

This removes a optimization, I don't think this is a good fix. Can we 
initialize it properly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-19 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/13065
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-18 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r88759884
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator(
*/
   override def next(): (BlockId, InputStream) = {
 numBlocksProcessed += 1
-val startFetchWait = System.currentTimeMillis()
-currentResult = results.take()
-val result = currentResult
-val stopFetchWait = System.currentTimeMillis()
-shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
-
-result match {
-  case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) =>
-if (address != blockManager.blockManagerId) {
-  shuffleMetrics.incRemoteBytesRead(buf.size)
-  shuffleMetrics.incRemoteBlocksFetched(1)
-}
-bytesInFlight -= size
-if (isNetworkReqDone) {
-  reqsInFlight -= 1
-  logDebug("Number of requests in flight " + reqsInFlight)
-}
-  case _ =>
-}
-// Send fetch requests up to maxBytesInFlight
-fetchUpToMaxBytes()
 
-result match {
-  case FailureFetchResult(blockId, address, e) =>
-throwFetchFailedException(blockId, address, e)
+var result: FetchResult = null
+var input: InputStream = null
+// Take the next fetched result and try to decompress it to detect 
data corruption,
+// then fetch it one more time if it's corrupt, throw 
FailureFetchResult if the second fetch
+// is also corrupt, so the previous stage could be retried.
+// For local shuffle block, throw FailureFetchResult for the first 
IOException.
+while (result == null) {
+  val startFetchWait = System.currentTimeMillis()
+  result = results.take()
+  val stopFetchWait = System.currentTimeMillis()
+  shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
 
-  case SuccessFetchResult(blockId, address, _, buf, _) =>
-try {
-  (result.blockId, new 
BufferReleasingInputStream(buf.createInputStream(), this))
-} catch {
-  case NonFatal(t) =>
-throwFetchFailedException(blockId, address, t)
-}
+  result match {
+case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
+  if (address != blockManager.blockManagerId) {
+shuffleMetrics.incRemoteBytesRead(buf.size)
+shuffleMetrics.incRemoteBlocksFetched(1)
+  }
+  bytesInFlight -= size
+  if (isNetworkReqDone) {
+reqsInFlight -= 1
+logDebug("Number of requests in flight " + reqsInFlight)
+  }
+
+  val in = try {
+buf.createInputStream()
+  } catch {
+// The exception could only be throwed by local shuffle block
+case e: IOException =>
+  assert(buf.isInstanceOf[FileSegmentManagedBuffer])
+  logError("Failed to create input stream from local block", e)
+  buf.release()
+  throwFetchFailedException(blockId, address, e)
+  }
+
+  input = streamWrapper(blockId, in)
+  // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
+  // block is small (the decompressed block is smaller than 
maxBytesInFlight)
--- End diff --

I tried to add checksum for shuffle blocks in 
https://github.com/apache/spark/pull/15894, that will have much more complexity 
and overhead, so in favor of this lighter one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-18 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r88759763
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator(
*/
   override def next(): (BlockId, InputStream) = {
 numBlocksProcessed += 1
-val startFetchWait = System.currentTimeMillis()
-currentResult = results.take()
-val result = currentResult
-val stopFetchWait = System.currentTimeMillis()
-shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
-
-result match {
-  case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) =>
-if (address != blockManager.blockManagerId) {
-  shuffleMetrics.incRemoteBytesRead(buf.size)
-  shuffleMetrics.incRemoteBlocksFetched(1)
-}
-bytesInFlight -= size
-if (isNetworkReqDone) {
-  reqsInFlight -= 1
-  logDebug("Number of requests in flight " + reqsInFlight)
-}
-  case _ =>
-}
-// Send fetch requests up to maxBytesInFlight
-fetchUpToMaxBytes()
 
-result match {
-  case FailureFetchResult(blockId, address, e) =>
-throwFetchFailedException(blockId, address, e)
+var result: FetchResult = null
+var input: InputStream = null
+// Take the next fetched result and try to decompress it to detect 
data corruption,
+// then fetch it one more time if it's corrupt, throw 
FailureFetchResult if the second fetch
+// is also corrupt, so the previous stage could be retried.
+// For local shuffle block, throw FailureFetchResult for the first 
IOException.
+while (result == null) {
+  val startFetchWait = System.currentTimeMillis()
+  result = results.take()
+  val stopFetchWait = System.currentTimeMillis()
+  shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
 
-  case SuccessFetchResult(blockId, address, _, buf, _) =>
-try {
-  (result.blockId, new 
BufferReleasingInputStream(buf.createInputStream(), this))
-} catch {
-  case NonFatal(t) =>
-throwFetchFailedException(blockId, address, t)
-}
+  result match {
+case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
+  if (address != blockManager.blockManagerId) {
+shuffleMetrics.incRemoteBytesRead(buf.size)
+shuffleMetrics.incRemoteBlocksFetched(1)
+  }
+  bytesInFlight -= size
+  if (isNetworkReqDone) {
+reqsInFlight -= 1
+logDebug("Number of requests in flight " + reqsInFlight)
+  }
+
+  val in = try {
+buf.createInputStream()
+  } catch {
+// The exception could only be throwed by local shuffle block
+case e: IOException =>
+  assert(buf.isInstanceOf[FileSegmentManagedBuffer])
+  logError("Failed to create input stream from local block", e)
+  buf.release()
+  throwFetchFailedException(blockId, address, e)
+  }
+
+  input = streamWrapper(blockId, in)
+  // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
+  // block is small (the decompressed block is smaller than 
maxBytesInFlight)
--- End diff --

The purpose of this PR is to reduce the possibility that failed job caused 
by network/disk corruption, without introduce other regression (OOM). 
Typically, the shuffle blocks are small, so we can have parallel fetching even 
with this maxBytesInFlight limit. For those few blocks (for example, data 
skew), we does not check that for now (at least, it's not worse than before).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-18 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13065#discussion_r88755507
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -103,5 +109,192 @@ case class GenerateExec(
   }
 }
   }
-}
 
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  protected override def doProduce(ctx: CodegenContext): String = {
+child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
+ctx.currentVars = input
+ctx.copyResult = true
+
+// Add input rows to the values when we are joining
+val values = if (join) {
+  input
+} else {
+  Seq.empty
+}
+
+boundGenerator match {
+  case e: CollectionGenerator => codeGenCollection(ctx, e, values, row)
+  case g => codeGenTraversableOnce(ctx, g, values, row)
+}
+  }
+
+  /**
+   * Generate code for [[CollectionGenerator]] expressions.
+   */
+  private def codeGenCollection(
+  ctx: CodegenContext,
+  e: CollectionGenerator,
+  input: Seq[ExprCode],
+  row: ExprCode): String = {
+
+// Generate code for the generator.
+val data = e.genCode(ctx)
+
+// Generate looping variables.
+val index = ctx.freshName("index")
+
+// Add a check if the generate outer flag is true.
+val checks = optionalCode(outer, data.isNull)
+
+// Add position
+val position = if (e.position) {
+  Seq(ExprCode("", "false", index))
+} else {
+  Seq.empty
+}
+
+// Generate code for either ArrayData or MapData
+val (initMapData, updateRowData, values) = e.collectionType match {
+  case ArrayType(st: StructType, nullable) if e.inline =>
+val row = codeGenAccessor(ctx, data.value, "col", index, st, 
nullable, checks)
+val fieldChecks = checks ++ optionalCode(nullable, row.isNull)
+val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) =>
+  codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, 
f.nullable, fieldChecks)
+}
+("", row.code, columns)
+
+  case ArrayType(dataType, nullable) =>
+("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, 
dataType, nullable, checks)))
+
+  case MapType(keyType, valueType, valueContainsNull) =>
+// Materialize the key and the value arrays before we enter the 
loop.
+val keyArray = ctx.freshName("keyArray")
+val valueArray = ctx.freshName("valueArray")
+val initArrayData =
+  s"""
+ |ArrayData $keyArray = ${data.isNull} ? null : 
${data.value}.keyArray();
+ |ArrayData $valueArray = ${data.isNull} ? null : 
${data.value}.valueArray();
+   """.stripMargin
+val values = Seq(
+  codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = 
false, checks),
+  codeGenAccessor(ctx, valueArray, "value", index, valueType, 
valueContainsNull, checks))
+(initArrayData, "", values)
+}
+
+// In case of outer=true we need to make sure the loop is executed 
at-least once when the
+// array/map contains no input. We do this by setting the looping 
index to -1 if there is no
+// input, evaluation of the array is prevented by a check in the 
accessor code.
+val numElements = ctx.freshName("numElements")
+val init = if (outer) {
+  s"$numElements == 0 ? -1 : 0"
+} else {
+  "0"
+}
+val numOutput = metricTerm(ctx, "numOutputRows")
+s"""
+   |${data.code}
+   |$initMapData
+   |int $numElements = ${data.isNull} ? 0 : 
${data.value}.numElements();
+   |for (int $index = $init; $index < $numElements; $index++) {
+   |  $numOutput.add(1);
+   |  $updateRowData
+   |  ${consume(ctx, input ++ position ++ values)}
+   |}
+ """.stripMargin
+  }
+
+  /**
+   * Generate code for a regular [[TraversableOnce]] returning 
[[Generator]].
+   */
+  private def codeGenTraversableOnce(
+  ctx: CodegenContext,
+  e: Expression,
+  input: Seq[ExprCode],
+  row: ExprCode): String = {
+
+// Generate the code for the generator
 

[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-18 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13065#discussion_r88754155
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -113,4 +117,25 @@ class WholeStageCodegenSuite extends SparkPlanTest 
with SharedSQLContext {
 
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
 assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
   }
+
+  test("generate should be included in WholeStageCodegen") {
+import org.apache.spark.sql.functions._
+val ds = spark.range(2).select(
+  col("id"),
+  explode(array(col("id") + 1, col("id") + 2)).as("value"))
+val plan = ds.queryExecution.executedPlan
+assert(plan.find(p =>
+  p.isInstanceOf[WholeStageCodegenExec] &&
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[GenerateExec]).isDefined)
+assert(ds.collect() === Array(Row(0, 1), Row(0, 2), Row(1, 2), Row(1, 
3)))
+  }
+
+  test("large inline generate should fail in WholeStageCodegen") {
--- End diff --

I mean it should not reply on the fallback for failed compiling (whole plan 
will fallback), but don't use whole stage codegen for this Generate.

This could be done by checking it in `Generate.supportCodegen()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15935: [SPARK-] add checksum for blocks of broadcast

2016-11-18 Thread davies
GitHub user davies opened a pull request:

https://github.com/apache/spark/pull/15935

[SPARK-] add checksum for blocks of broadcast

## What changes were proposed in this pull request?

A TorrentBroadcast is serialized and compressed first, then splitted as 
fixed size blocks, if any block is corrupt when fetching from remote, the 
decompression/deserialization will fail without knowing which block is corrupt. 
Also, the corrupt block is kept in block manager and reported to driver, so 
other tasks (in same executor or from different executor) will also fail 
because of it. 

This PR add checksum for each block, and check it after fetching a block 
from remote executor, because it's very likely that the corruption happen in 
network. When the corruption happen, it will throw the block away and throw an 
exception to fail the task, which will be retried.

## How was this patch tested?

Existing tests.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davies/spark broadcast_checksum

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15935.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15935


commit 328ca390f10cceeb21656eb4fb9e0ef677fd0672
Author: Davies Liu <dav...@databricks.com>
Date:   2016-11-18T22:27:27Z

add checksum for blocks of broadcast




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15923: [SPARK-4105] retry the fetch or stage if shuffle block i...

2016-11-18 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15923
  
@joshrosen @zsxwing Could you help to review this one ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-17 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13065#discussion_r88581043
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -103,5 +109,182 @@ case class GenerateExec(
   }
 }
   }
-}
 
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  protected override def doProduce(ctx: CodegenContext): String = {
+child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
+ctx.currentVars = input
+ctx.copyResult = true
+
+// Add input rows to the values when we are joining
+val values = if (join) {
+  input
+} else {
+  Seq.empty
+}
+
+boundGenerator match {
+  case e: CollectionGenerator => codeGenCollection(ctx, e, values, row)
+  case g => codeGenTraversableOnce(ctx, g, values, row)
+}
+  }
+
+  /**
+   * Generate code for [[CollectionGenerator]] expressions.
+   */
+  private def codeGenCollection(
+  ctx: CodegenContext,
+  e: CollectionGenerator,
+  input: Seq[ExprCode],
+  row: ExprCode): String = {
+
+// Generate code for the generator.
+val data = e.genCode(ctx)
+
+// Generate looping variables.
+val index = ctx.freshName("index")
+
+// Add a check if the generate outer flag is true.
+val checks = optionalCode(outer, data.isNull)
+
+// Add position
+val position = if (e.position) {
+  Seq(ExprCode("", "false", index))
+} else {
+  Seq.empty
+}
+
+// Generate code for either ArrayData or MapData
+val (initMapData, updateRowData, values) = e.collectionType match {
+  case ArrayType(st: StructType, nullable) if e.inline =>
+val row = codeGenAccessor(ctx, data.value, "col", index, st, 
nullable, checks)
+val fieldChecks = checks ++ optionalCode(nullable, row.isNull)
+val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) =>
+  codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, 
f.nullable, fieldChecks)
+}
+("", row.code, columns)
+
+  case ArrayType(dataType, nullable) =>
+("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, 
dataType, nullable, checks)))
+
+  case MapType(keyType, valueType, valueContainsNull) =>
+// Materialize the key and the value arrays before we enter the 
loop.
+val keyArray = ctx.freshName("keyArray")
+val valueArray = ctx.freshName("valueArray")
+val initArrayData =
+  s"""
+ |ArrayData $keyArray = ${data.isNull} ? null : 
${data.value}.keyArray();
+ |ArrayData $valueArray = ${data.isNull} ? null : 
${data.value}.valueArray();
+   """.stripMargin
+val values = Seq(
+  codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = 
false, checks),
+  codeGenAccessor(ctx, valueArray, "value", index, valueType, 
valueContainsNull, checks))
+(initArrayData, "", values)
+}
+
+// In case of outer=true we need to make sure the loop is executed 
at-least once when the
+// array/map contains no input. We do this by setting the looping 
index to -1 if there is no
+// input, evaluation of the array is prevented by a check in the 
accessor code.
+val numElements = ctx.freshName("numElements")
+val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0"
+val numOutput = metricTerm(ctx, "numOutputRows")
+s"""
+   |${data.code}
+   |$initMapData
+   |int $numElements = ${data.isNull} ? 0 : 
${data.value}.numElements();
+   |for (int $index = $init; $index < $numElements; $index++) {
+   |  $numOutput.add(1);
+   |  $updateRowData
+   |  ${consume(ctx, input ++ position ++ values)}
+   |}
+ """.stripMargin
+  }
+
+  /**
+   * Generate code for a regular [[TraversableOnce]] returning 
[[Generator]].
+   */
+  private def codeGenTraversableOnce(
+  ctx: CodegenContext,
+  e: Expression,
+  input: Seq[ExprCode],
+  row: ExprCode): String = {
+
+// Generate the code for the generator
+val data = e.genCode(ctx)
+
+// Gene

[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-17 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13065#discussion_r88580906
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -113,4 +117,25 @@ class WholeStageCodegenSuite extends SparkPlanTest 
with SharedSQLContext {
 
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
 assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
   }
+
+  test("generate should be included in WholeStageCodegen") {
+import org.apache.spark.sql.functions._
+val ds = spark.range(2).select(
+  col("id"),
+  explode(array(col("id") + 1, col("id") + 2)).as("value"))
+val plan = ds.queryExecution.executedPlan
+assert(plan.find(p =>
+  p.isInstanceOf[WholeStageCodegenExec] &&
+
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[GenerateExec]).isDefined)
+assert(ds.collect() === Array(Row(0, 1), Row(0, 2), Row(1, 2), Row(1, 
3)))
+  }
+
+  test("large inline generate should fail in WholeStageCodegen") {
--- End diff --

Should this still work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15894: [SPARK-18188] Add checksum for shuffle blocks

2016-11-17 Thread davies
Github user davies commented on the issue:

https://github.com/apache/spark/pull/15894
  
Due to complexity and overhead here, close it in favor of 
https://github.com/apache/spark/pull/15923/.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15894: [SPARK-18188] Add checksum for shuffle blocks

2016-11-17 Thread davies
Github user davies closed the pull request at:

https://github.com/apache/spark/pull/15894


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-17 Thread davies
GitHub user davies opened a pull request:

https://github.com/apache/spark/pull/15923

[SPARK-4105] retry the fetch or stage if shuffle block is corrupt

## What changes were proposed in this pull request?

There is an outstanding issue that existed for a long time: Sometimes the 
shuffle blocks are corrupt and can't be decompressed. We recently hit this in 
three different workloads, sometimes we can reproduce it by every try, 
sometimes can't. I also found that when the corruption happened, the beginning 
and end of the blocks are correct, the corruption happen in the middle. There 
was one case that the string of block id is corrupt by one character. It seems 
that it's very likely the corruption is introduced by some weird 
machine/hardware, also the checksum (16 bits) in TCP is not strong enough to 
identify all the corruption.

Unfortunately, Spark does not have checksum for shuffle blocks or 
broadcast, the job will fail if any corruption happen in the shuffle block from 
disk, or broadcast blocks during network. This PR try to detect the corruption 
after fetching shuffle blocks by decompressing them, because most of the 
compression already have checksum in them. It will retry the block, or failed 
with FetchFailure, so the previous stage could be retried on different (still 
random) machines.

## How was this patch tested?

TBD


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davies/spark detect_corrupt

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15923


commit 5c93aafd00b6ba50e8211227272aac6041fee376
Author: Davies Liu <dav...@databricks.com>
Date:   2016-11-18T00:04:40Z

retry the fetch or stage if shuffle block is corrupt




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-17 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13065#discussion_r88550711
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -40,6 +42,10 @@ private[execution] sealed case class LazyIterator(func: 
() => TraversableOnce[In
  * output of each into a new stream of rows.  This operation is similar to 
a `flatMap` in functional
  * programming with one important additional feature, which allows the 
input rows to be joined with
  * their output.
+ *
+ * This operator supports whole stage code generation for generators that 
do not implement
+ * terminate().
--- End diff --

I see, those Generator with terminate implements CodegenFallback, then 
GenerateExec will also fallback, nvm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-17 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13065#discussion_r88525329
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
 ---
@@ -149,29 +167,52 @@ case class Stack(children: Seq[Expression])
   InternalRow(fields: _*)
 }
   }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+// Rows - we write these into an array.
+val rowData = ctx.freshName("rows")
+ctx.addMutableState("InternalRow[]", rowData, s"this.$rowData = new 
InternalRow[$numRows];")
+val values = children.tail
+val dataTypes = values.take(numFields).map(_.dataType)
+val code = ctx.splitExpressions(ctx.INPUT_ROW, Seq.tabulate(numRows) { 
row =>
--- End diff --

Unfortunately, splitExpressions does not work with whole stage codegen, we 
have to fallback if there are many rows.

Can you add a test for that (many rows)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-17 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13065#discussion_r88524027
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -40,6 +42,10 @@ private[execution] sealed case class LazyIterator(func: 
() => TraversableOnce[In
  * output of each into a new stream of rows.  This operation is similar to 
a `flatMap` in functional
  * programming with one important additional feature, which allows the 
input rows to be joined with
  * their output.
+ *
+ * This operator supports whole stage code generation for generators that 
do not implement
+ * terminate().
--- End diff --

Could be done in 
```
override def supportCodegen(): Boolean = {
  generator.terminate().isEmpty
}



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate

2016-11-17 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/13065#discussion_r88523492
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -103,5 +109,182 @@ case class GenerateExec(
   }
 }
   }
-}
 
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+child.asInstanceOf[CodegenSupport].inputRDDs()
+  }
+
+  protected override def doProduce(ctx: CodegenContext): String = {
+child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
+ctx.currentVars = input
+ctx.copyResult = true
+
+// Add input rows to the values when we are joining
+val values = if (join) {
+  input
+} else {
+  Seq.empty
+}
+
+boundGenerator match {
+  case e: CollectionGenerator => codeGenCollection(ctx, e, values, row)
+  case g => codeGenTraversableOnce(ctx, g, values, row)
+}
+  }
+
+  /**
+   * Generate code for [[CollectionGenerator]] expressions.
+   */
+  private def codeGenCollection(
+  ctx: CodegenContext,
+  e: CollectionGenerator,
+  input: Seq[ExprCode],
+  row: ExprCode): String = {
+
+// Generate code for the generator.
+val data = e.genCode(ctx)
+
+// Generate looping variables.
+val index = ctx.freshName("index")
+
+// Add a check if the generate outer flag is true.
+val checks = optionalCode(outer, data.isNull)
+
+// Add position
+val position = if (e.position) {
+  Seq(ExprCode("", "false", index))
+} else {
+  Seq.empty
+}
+
+// Generate code for either ArrayData or MapData
+val (initMapData, updateRowData, values) = e.collectionType match {
+  case ArrayType(st: StructType, nullable) if e.inline =>
+val row = codeGenAccessor(ctx, data.value, "col", index, st, 
nullable, checks)
+val fieldChecks = checks ++ optionalCode(nullable, row.isNull)
+val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) =>
+  codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, 
f.nullable, fieldChecks)
+}
+("", row.code, columns)
+
+  case ArrayType(dataType, nullable) =>
+("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, 
dataType, nullable, checks)))
+
+  case MapType(keyType, valueType, valueContainsNull) =>
+// Materialize the key and the value arrays before we enter the 
loop.
+val keyArray = ctx.freshName("keyArray")
+val valueArray = ctx.freshName("valueArray")
+val initArrayData =
+  s"""
+ |ArrayData $keyArray = ${data.isNull} ? null : 
${data.value}.keyArray();
+ |ArrayData $valueArray = ${data.isNull} ? null : 
${data.value}.valueArray();
+   """.stripMargin
+val values = Seq(
+  codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = 
false, checks),
+  codeGenAccessor(ctx, valueArray, "value", index, valueType, 
valueContainsNull, checks))
+(initArrayData, "", values)
+}
+
+// In case of outer=true we need to make sure the loop is executed 
at-least once when the
+// array/map contains no input. We do this by setting the looping 
index to -1 if there is no
+// input, evaluation of the array is prevented by a check in the 
accessor code.
+val numElements = ctx.freshName("numElements")
+val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0"
+val numOutput = metricTerm(ctx, "numOutputRows")
+s"""
+   |${data.code}
+   |$initMapData
+   |int $numElements = ${data.isNull} ? 0 : 
${data.value}.numElements();
+   |for (int $index = $init; $index < $numElements; $index++) {
+   |  $numOutput.add(1);
+   |  $updateRowData
+   |  ${consume(ctx, input ++ position ++ values)}
+   |}
+ """.stripMargin
+  }
+
+  /**
+   * Generate code for a regular [[TraversableOnce]] returning 
[[Generator]].
+   */
+  private def codeGenTraversableOnce(
+  ctx: CodegenContext,
+  e: Expression,
+  input: Seq[ExprCode],
+  row: ExprCode): String = {
+
+// Generate the code for the generator
+val data = e.genCode(ctx)
+
+// Gene

  1   2   3   4   5   6   7   8   9   10   >