[GitHub] spark issue #20825: add impurity stats in tree leaf node debug string
Github user davies commented on the issue: https://github.com/apache/spark/pull/20825 cc @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20825: add impurity stats in tree leaf node debug string
Github user davies commented on the issue: https://github.com/apache/spark/pull/20825 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user davies commented on the issue: https://github.com/apache/spark/pull/20561 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/20561#discussion_r167300330 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala --- @@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { spill = true ) } + + test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") { +val memoryManager = new TestMemoryManager(new SparkConf()) +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) +val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes()) + +// Key/value are a unsafe rows with a single int column +val schema = new StructType().add("i", IntegerType) +val key = new UnsafeRow(1) +key.pointTo(new Array[Byte](32), 32) +key.setInt(0, 1) +val value = new UnsafeRow(1) +value.pointTo(new Array[Byte](32), 32) +value.setInt(0, 2) + +for (_ <- 1 to 65) { + val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes) + loc.append( +key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, +value.getBaseObject, value.getBaseOffset, value.getSizeInBytes) +} + +// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap` +// which has duplicated keys and the number of entries exceeds its capacity. --- End diff -- For aggregation, there are no multiple entries for same key, that only happen for hash join (Don't remember the details) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/20561#discussion_r167299807 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java --- @@ -98,10 +99,20 @@ public UnsafeKVExternalSorter( numElementsForSpillThreshold, canUseRadixSort); } else { - // The array will be used to do in-place sort, which require half of the space to be empty. - // Note: each record in the map takes two entries in the array, one is record pointer, - // another is the key prefix. - assert(map.numKeys() * 2 <= map.getArray().size() / 2); + // `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but + // `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` + // can have duplicated keys, here we need a check to make sure the point array can hold + // all the entries in `BytesToBytesMap`. + final LongArray pointArray; + // The point array will be used to do in-place sort, which require half of the space to be + // empty. Note: each record in the map takes two entries in the point array, one is record + // pointer, another is the key prefix. + if (map.numValues() > map.getArray().size() / 4) { +pointArray = map.allocateArray(map.numValues() * 4); --- End diff -- The allocation may fail. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/20561#discussion_r167299716 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java --- @@ -98,10 +99,20 @@ public UnsafeKVExternalSorter( numElementsForSpillThreshold, canUseRadixSort); } else { - // The array will be used to do in-place sort, which require half of the space to be empty. - // Note: each record in the map takes two entries in the array, one is record pointer, - // another is the key prefix. - assert(map.numKeys() * 2 <= map.getArray().size() / 2); + // `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but + // `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` --- End diff -- It's possible to change UnsafeInMemorySorter to have multiple entries with same key. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18052: [SPARK-20347][PYSPARK][WIP] Provide AsyncRDDActions in P...
Github user davies commented on the issue: https://github.com/apache/spark/pull/18052 Personally, I think less is more, don't add everything into every software, otherwise every software can write email eventually. The RDD API is kind of frozen, we don't add more APIs into it if it's not necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18052: [SPARK-20347][PYSPARK][WIP] Provide AsyncRDDActions in P...
Github user davies commented on the issue: https://github.com/apache/spark/pull/18052 It seems that it's also easy to implement these outside of PySpark by user themselves or third-party libraries, right? If that's the case, I'd like not to add it into PySpark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18244: [SPARK-20211][SQL] Fix the Precision and Scale of Decima...
Github user davies commented on the issue: https://github.com/apache/spark/pull/18244 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18244: [SPARK-20211][SQL] Fix the Precision and Scale of...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/18244#discussion_r121050241 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala --- @@ -126,7 +126,15 @@ final class Decimal extends Ordered[Decimal] with Serializable { def set(decimal: BigDecimal): Decimal = { this.decimalVal = decimal this.longVal = 0L -this._precision = decimal.precision +if (decimal.compare(BigDecimal(1.0)) == -1 && decimal.compare(BigDecimal(-1.0)) == 1) { --- End diff -- if (decimal.presision <= decimal.scale) { } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17375: [SPARK-19019][PYTHON][BRANCH-1.6] Fix hijacked `collecti...
Github user davies commented on the issue: https://github.com/apache/spark/pull/17375 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17374: [SPARK-19019][PYTHON][BRANCH-2.0] Fix hijacked `collecti...
Github user davies commented on the issue: https://github.com/apache/spark/pull/17374 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17282: [SPARK-19872][PYTHON] Use the correct deserializer for R...
Github user davies commented on the issue: https://github.com/apache/spark/pull/17282 lgtm, merging into master, and 2.1, 2.0 branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r105007318 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala --- @@ -341,25 +364,27 @@ private[window] final class UnboundedFollowingWindowFunctionFrame( override def write(index: Int, current: InternalRow): Unit = { var bufferUpdated = index == 0 -// Duplicate the input to have a new iterator -val tmp = input.copy() - -// Drop all rows from the buffer for which the input row value is smaller than +// Ignore all the rows from the buffer for which the input row value is smaller than // the output row lower bound. -tmp.skip(inputIndex) -var nextRow = tmp.next() +val iterator = input.generateIterator(startIndex = inputIndex) + +def getNextOrNull(iterator: Iterator[UnsafeRow]): UnsafeRow = { --- End diff -- If we expect a row somewhere, I'd like to call next(), it will throw an exception if something goes wrong, otherwise you will silient got wrong result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r105006782 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala --- @@ -164,9 +176,12 @@ private[window] final class SlidingWindowFunctionFrame( private[this] var inputLowIndex = 0 /** Prepare the frame for calculating a new partition. Reset all variables. */ - override def prepare(rows: RowBuffer): Unit = { + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { input = rows -nextRow = rows.next() +inputIterator = input.generateIterator() +if (inputIterator.hasNext) { --- End diff -- Should this be a assert? Or you may suddenly have a wrong result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r105006306 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -674,6 +675,24 @@ object SQLConf { .stringConf .createWithDefault(TimeZone.getDefault().getID()) + val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD = +buildConf("spark.sql.windowExec.buffer.spill.threshold") + .doc("Threshold for number of rows buffered in window operator") + .intConf + .createWithDefault(4096) + + val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD = +buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold") + .doc("Threshold for number of rows buffered in sort merge join operator") + .intConf + .createWithDefault(Int.MaxValue) + + val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD = +buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold") + .doc("Threshold for number of rows buffered in cartesian product operator") + .intConf + .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) --- End diff -- There are too many nobs, should we mark them as internal? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16896: [SPARK-19561][Python] cast TimestampType.toInternal outp...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16896 My bad, did not realized that, sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16896: [SPARK-19561][Python] cast TimestampType.toInternal outp...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16896 Merged into master and 2.1 branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16896: [SPARK-19561][Python] cast TimestampType.toInternal outp...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16896 lgtm, will merge it when I get a chance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16782: [SPARK-19348][PYTHON][WIP] PySpark keyword_only d...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16782#discussion_r103276349 --- Diff: python/pyspark/__init__.py --- @@ -96,9 +96,11 @@ def keyword_only(func): """ @wraps(func) def wrapper(*args, **kwargs): +# NOTE - this assumes we are wrapping a method and args[0] will be 'self' if len(args) > 1: raise TypeError("Method %s forces keyword arguments." % func.__name__) wrapper._input_kwargs = kwargs --- End diff -- If the assumption is correct, should we always use 'self' to hold the kwargs? (remove this line and update all the fuctions that use `keyword_only`)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17036: [SPARK-19706][pyspark] add Column.contains in pyspark
Github user davies commented on the issue: https://github.com/apache/spark/pull/17036 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17036: [SPARK-19706][pyspark] add Column.contains in pyspark
Github user davies commented on the issue: https://github.com/apache/spark/pull/17036 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16865: [SPARK-19530][SQL] Use guava weigher for code cache evic...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16865 I still think it's not worth it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMa...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16844 Merging into master, 2.1, 2.0 branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16844#discussion_r101360777 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -742,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; -if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { +if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { --- End diff -- Unfortunately, we can't grow in the beginning, otherwise the `pos` will be wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r101214678 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer +import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} + +/** + * An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined + * threshold of rows is reached. + * + * Setting spill threshold faces following trade-off: + * + * - If the spill threshold is too high, the in-memory array may occupy more memory than is + * available, resulting in OOM. + * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. + * This may lead to a performance regression compared to the normal case of using an + * [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { --- End diff -- How does this compare to use ExternalUnsafeSorter directly? There is a similar use case here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala#L42 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16896: [SPARK-19561][Python] cast TimestampType.toInternal outp...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16896 Just one minor comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16896: [SPARK-19561][Python] cast TimestampType.toIntern...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16896#discussion_r101204910 --- Diff: python/pyspark/sql/types.py --- @@ -189,7 +189,7 @@ def toInternal(self, dt): if dt is not None: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) -return int(seconds) * 100 + dt.microsecond +return long(int(seconds) * 100 + dt.microsecond) --- End diff -- Could you just replace the `int` as `long`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16865: [SPARK-19530][SQL] Use guava weigher for code cache evic...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16865 @viirya This is a general OOM, should not be caused by cached bytecode, they are way smaller comparing other things in executor, I think this patch will not help either. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16865: [SPARK-19530][SQL] Use guava weigher for code cache evic...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16865 I understand the motivation here, could you show the benefit of this change for a real use case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16865: [SPARK-19530][SQL] Use guava weigher for code cac...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16865#discussion_r100428764 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1004,7 +1016,8 @@ object CodeGenerator extends Logging { * weak keys/values and thus does not respond to memory pressure. */ private val cache = CacheBuilder.newBuilder() -.maximumSize(100) +.maximumWeight(10 * 1024 * 1024) --- End diff -- @dongjoon-hyun That's a limit on single Java method, not a whole class. A generated class could be way bigger than 64KB. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16844#discussion_r100387863 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -695,11 +690,16 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff assert (vlen % 8 == 0); assert (longArray != null); - if (numKeys == MAX_CAPACITY -// The map could be reused from last spill (because of no enough memory to grow), -// then we don't try to grow again if hit the `growthThreshold`. -|| !canGrowArray && numKeys > growthThreshold) { -return false; + if (numKeys >= growthThreshold) { +if (longArray.size() / 2 == MAX_CAPACITY) { --- End diff -- @mridulm Do that means we should also rename the `growAndRehash` to `tryGrowAndRehash`? I think those are not necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16825: [SPARK-19481][REPL][maven]Avoid to leak SparkContext in ...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16825 lgtm, merging this into master and 2.1 branch, thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16844#discussion_r100383151 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -741,14 +741,6 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff numKeys++; longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; - -if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { --- End diff -- longArray.size() is the next capacity for current grow strategy, it should be `longArray.size() <= MAX_CAPACITY` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16844#discussion_r100381544 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -695,11 +690,16 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff assert (vlen % 8 == 0); assert (longArray != null); - if (numKeys == MAX_CAPACITY -// The map could be reused from last spill (because of no enough memory to grow), -// then we don't try to grow again if hit the `growthThreshold`. -|| !canGrowArray && numKeys > growthThreshold) { -return false; + if (numKeys >= growthThreshold) { +if (longArray.size() / 2 == MAX_CAPACITY) { --- End diff -- There are two reason it will fail to grow: 1) current capacity (longArray.size() / 2) reach MAX_CAPACITY 2) can't allocate a array (OOM). So, I think the checking here is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16825: [SPARK-19481][REPL][maven]Avoid to leak SparkCont...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16825#discussion_r100206125 --- Diff: repl/src/main/scala/org/apache/spark/repl/Signaling.scala --- @@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging { * when no jobs are currently running. * This makes it possible to interrupt a running shell job by pressing Ctrl+C. */ - def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") { -if (!ctx.statusTracker.getActiveJobIds().isEmpty) { - logWarning("Cancelling all active jobs, this can take a while. " + -"Press Ctrl+C again to exit now.") - ctx.cancelAllJobs() - true -} else { - false -} + def cancelOnInterrupt(): Unit = SignalUtils.register("INT") { --- End diff -- Who is using this one? Is this a breaking change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMa...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16844 @viirya Addressed your comment, also fixed another bug (updated PR description). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMa...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16844 cc @joshrosen, @viirya --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16844: [SPARK-19500] [SQL] Fix off-by-one bug in BytesTo...
GitHub user davies opened a pull request: https://github.com/apache/spark/pull/16844 [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap ## What changes were proposed in this pull request? Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1). This PR fix the off-by-one bug in BytesToBytesMap. ## How was this patch tested? Added regression test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/davies/spark off_by_one Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16844 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13808: [SPARK-14480][SQL] Remove meaningless StringIteratorRead...
Github user davies commented on the issue: https://github.com/apache/spark/pull/13808 @HyukjinKwon @rxin This patch have a regression: A column that have escaped newline can't be correctly parsed anymore. Should we revert this patch or figure a way to fix that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16581: [SPARK-18589] [SQL] Fix Python UDF accessing attributes ...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16581 Cherry-picked into 2.1 branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15467: [SPARK-17912][SQL] Refactor code generation to get data ...
Github user davies commented on the issue: https://github.com/apache/spark/pull/15467 Merging this into master, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16581: [SPARK-18589] [SQL] Fix Python UDF accessing attr...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16581#discussion_r96469102 --- Diff: python/pyspark/sql/tests.py --- @@ -342,6 +342,15 @@ def test_udf_in_filter_on_top_of_outer_join(self): df = df.withColumn('b', udf(lambda x: 'x')(df.a)) self.assertEqual(df.filter('b = "x"').collect(), [Row(a=1, b='x')]) +def test_udf_in_filter_on_top_of_join(self): --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16429: [SPARK-19019][PYTHON] Fix hijacked `collections.namedtup...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16429 lgtm, merging into master and 2.1 branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16429: [SPARK-19019][PYTHON] Fix hijacked `collections.n...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16429#discussion_r96130191 --- Diff: python/pyspark/serializers.py --- @@ -382,18 +382,30 @@ def _hijack_namedtuple(): return global _old_namedtuple # or it will put in closure +global _old_namedtuple_kwdefaults # or it will put in closure too def _copy_func(f): return types.FunctionType(f.__code__, f.__globals__, f.__name__, f.__defaults__, f.__closure__) +def _kwdefaults(f): +kargs = getattr(f, "__kwdefaults__", None) --- End diff -- Could you put this comment into code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16555: [SPARK-19180][SQL] the offset of short should be 2 in Of...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16555 lgtm, merging it into master, 2.1 and 2.0 branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16581: [SPARK-18589] [SQL] Fix Python UDF accessing attributes ...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16581 cc @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16581: [SPARK-18589] [SQL] Fix Python UDF accessing attr...
GitHub user davies opened a pull request: https://github.com/apache/spark/pull/16581 [SPARK-18589] [SQL] Fix Python UDF accessing attributes from both side of join ## What changes were proposed in this pull request? PythonUDF is unevaluable, which can not be used inside a join condition, currently the optimizer will push a PythonUDF which accessing both side of join into the join condition, then the query will fail to plan. This PR fix this issue by checking the expression is evaluable or not before pushing it into Join. ## How was this patch tested? Add a regression test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/davies/spark pyudf_join Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16581.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16581 commit 95d73fcf911bfb25b20fea798d1f7b3f4b319e26 Author: Davies Liu <dav...@databricks.com> Date: 2017-01-13T19:50:11Z Fix Python UDF accessing attributes from both side of join --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14452: [SPARK-16849][SQL] Improve subquery execution by dedupli...
Github user davies commented on the issue: https://github.com/apache/spark/pull/14452 @viirya For duplicated CTE, without some optimization (pushing down different predicates in different positions), the physical plan should be identical. So I'm wondering some aggressive pushing down cause the problem for some queries (IsNotNull(xxx)). This is the reason I asked that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15923#discussion_r93671999 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -305,40 +316,84 @@ final class ShuffleBlockFetcherIterator( */ override def next(): (BlockId, InputStream) = { numBlocksProcessed += 1 -val startFetchWait = System.currentTimeMillis() -currentResult = results.take() -val result = currentResult -val stopFetchWait = System.currentTimeMillis() -shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - -result match { - case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => -if (address != blockManager.blockManagerId) { - shuffleMetrics.incRemoteBytesRead(buf.size) - shuffleMetrics.incRemoteBlocksFetched(1) -} -bytesInFlight -= size -if (isNetworkReqDone) { - reqsInFlight -= 1 - logDebug("Number of requests in flight " + reqsInFlight) -} - case _ => -} -// Send fetch requests up to maxBytesInFlight -fetchUpToMaxBytes() -result match { - case FailureFetchResult(blockId, address, e) => -throwFetchFailedException(blockId, address, e) +var result: FetchResult = null +var input: InputStream = null +// Take the next fetched result and try to decompress it to detect data corruption, +// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch --- End diff -- @fathersson The checksum in TCP is only 16 bits, it's not strong enough for large traffic, usually DFS or other system with heavy TCP traffic will have another application level checksum. Adding to @rxin 's point, we did see this retry helped in production to work around temporary corrupt. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16211: [SPARK-18576][PYTHON] Add basic TaskContext information ...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16211 Looks good to me in general, cc @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16263: [SPARK-18281][SQL][PySpark] Remove timeout for reading d...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16263 Merging this into master and 2.1 branch, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15980: [SPARK-18528][SQL] Fix a bug to initialise an iterator o...
Github user davies commented on the issue: https://github.com/apache/spark/pull/15980 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16232: [SPARK-18800][SQL] Correct the assert in UnsafeKVExterna...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16232 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16232: [SPARK-18800][SQL] Fix UnsafeKVExternalSorter by correct...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16232 @viirya without a repro, I don't think this is the root cause. There could be a random corrupt that cause the error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16232: [SPARK-18800][SQL] Fix UnsafeKVExternalSorter by correct...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16232 That make sense, we should update the assert. But this still is not a bug, the other changes are not needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16232: [SPARK-18800][SQL] Fix UnsafeKVExternalSorter by correct...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16232 @viirya That's not correct, the values does not have entry in the array. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16232: [SPARK-18800][SQL] Fix UnsafeKVExternalSorter by correct...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16232 @viirya A map with multiple values for same key, is only used for hashed relation, not aggregation, will also not passed into UnsafeKVExternalSorter. So I think this is not actually a bug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16263: [SPARK-18281][SQL][PySpark] Consumes the returned...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16263#discussion_r93127142 --- Diff: python/pyspark/sql/tests.py --- @@ -558,6 +558,18 @@ def test_create_dataframe_from_objects(self): self.assertEqual(df.dtypes, [("key", "bigint"), ("value", "string")]) self.assertEqual(df.first(), Row(key=1, value="1")) +def test_to_localiterator_for_dataframe(self): --- End diff -- This test is duplicated, I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16263: [SPARK-18281][SQL][PySpark] Consumes the returned...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16263#discussion_r93126957 --- Diff: python/pyspark/rdd.py --- @@ -135,12 +135,12 @@ def _load_from_socket(port, serializer): break if not sock: raise Exception("could not open socket") -try: -rf = sock.makefile("rb", 65536) -for item in serializer.load_stream(rf): -yield item -finally: -sock.close() +# The RDD materialization time is unpredicable, if we set a timeout for socket reading +# operation, it will very possibly fail. See SPARK-18281. +sock.settimeout(None) +# The socket will be automatically closed when garbage-collected. +rf = sock.makefile("rb", 65536) +return serializer.load_stream(sock.makefile("rb", 65536)) --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16263: [SPARK-18281][SQL][PySpark] Consumes the returned...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16263#discussion_r92833679 --- Diff: python/pyspark/rdd.py --- @@ -2349,7 +2352,12 @@ def toLocalIterator(self): """ with SCCallSiteSync(self.context) as css: port = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd()) -return _load_from_socket(port, self._jrdd_deserializer) +# We set a timeout for connecting socket. The connection only begins when we start +# to consume the first element. If we do not begin to consume the returned iterator +# immediately, there will be a failure. +iter = _load_from_socket(port, self._jrdd_deserializer) --- End diff -- I think we could have a wrapper to load items, and return the iterator in _load_from_socket(): ``` def _load_from_socket(port): ... return serializer.load_stream(sock.makefile("rb", 65536)) ``` The `finally` may be optional, GC cloud close the socket (should be confirmed). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16263: [SPARK-18281][SQL][PySpark] Consumes the returned...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16263#discussion_r92830957 --- Diff: python/pyspark/rdd.py --- @@ -2349,7 +2352,12 @@ def toLocalIterator(self): """ with SCCallSiteSync(self.context) as css: port = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd()) -return _load_from_socket(port, self._jrdd_deserializer) +# We set a timeout for connecting socket. The connection only begins when we start +# to consume the first element. If we do not begin to consume the returned iterator +# immediately, there will be a failure. +iter = _load_from_socket(port, self._jrdd_deserializer) --- End diff -- Since the timeout is removed, is this still needed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16274: [SPARK-18853][SQL] Project (UnaryNode) is way too aggres...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16274 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16193 Pushing down predicates into data source is also during optimization in planner, I think this one is not the first that do optimization outside Optimizer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16193 The reason we move the PythonUDFEvaluator from logical plan into physical plan, because this one-off break many things, many rules need to treat specially. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16193 @cloud-fan It's not trivial to do this in optimizer, for example, we should split one Filter into two, that will conflict with another optimizer rule, that combine two filter into one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16193 If no objection in next two hours, I will merge this one into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16193: [SPARK-18766] [SQL] Push Down Filter Through BatchEvalPy...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16193 @cloud-fan There is no R UDF at this point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16235: [SPARK-18745][SQL] Fix signed integer overflow due to to...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16235 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16193: [SPARK-18766] [SQL] Push Down Filter Through Batc...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16193#discussion_r91641264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -165,4 +167,25 @@ object ExtractPythonUDFs extends Rule[SparkPlan] { } } } + + // Split the original FilterExec to two FilterExecs. The upper FilterExec only contains + // Python UDF and non-deterministic predicates. + private def trySplitFilter(plan: SparkPlan): SparkPlan = { +plan match { + case filter: FilterExec => +// Only push down the predicates that is deterministic and all the referenced attributes +// come from child. +val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(filter.condition).span(_.deterministic) +// Python UDF is always deterministic --- End diff -- Is this one useful? We just won't push down expressions that has Python UDFs here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16193: [SPARK-18766] [SQL] Push Down Filter Through Batc...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16193#discussion_r91609923 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -165,4 +167,31 @@ object ExtractPythonUDFs extends Rule[SparkPlan] { } } } + + // Split the original FilterExec to two FilterExecs. The upper FilterExec only contains + // Python UDF and non-deterministic predicates. + private def trySplitFilter(plan: SparkPlan): SparkPlan = { +plan match { + case filter: FilterExec => +// Only push down the predicates that is deterministic and all the referenced attributes +// come from child. +val (candidates, containingNonDeterministic) = + splitConjunctivePredicates(filter.condition).span(_.deterministic) +val (pushDown, rest) = candidates.partition(!hasPythonUDF(_)) +val stayUp = rest ++ containingNonDeterministic + +if (pushDown.nonEmpty) { + val newChild = FilterExec(pushDown.reduceLeft(And), filter.child) + if (stayUp.nonEmpty) { --- End diff -- There are should be some UDFs, so this will not be empty --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16121: [SPARK-16589][PYTHON] Chained cartesian produces incorre...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16121 LGTM, merging into master and 2.1 branch, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15923: [SPARK-4105] retry the fetch or stage if shuffle block i...
Github user davies commented on the issue: https://github.com/apache/spark/pull/15923 @JoshRosen Added a test for `detectCorrupt` is false. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16193: [SPARK-18766] [SQL] Push Down Filter Through Batc...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/16193#discussion_r91394979 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -166,3 +174,40 @@ object ExtractPythonUDFs extends Rule[SparkPlan] { } } } + +// This rule is to push deterministic predicates through BatchEvalPythonExec +object PushPredicateThroughBatchEvalPython extends Rule[SparkPlan] with PredicateHelper { --- End diff -- Having a predicate-pushdown rule for SparkPlan sounds bad, can we try to do this in extract()? for example ``` val splittedFilter = trySplitFilter(plan) val newChildren = splittedFilter.children.map { child => } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15923: [SPARK-4105] retry the fetch or stage if shuffle block i...
Github user davies commented on the issue: https://github.com/apache/spark/pull/15923 ping @JoshRosen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16151: [SPARK-18719] Add spark.ui.showConsoleProgress to config...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16151 lgtm, merging into master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16121: [SPARK-16589][PYTHON] Chained cartesian produces incorre...
Github user davies commented on the issue: https://github.com/apache/spark/pull/16121 It's pretty tricky to make the chained CartesianDeserializer work, maybe it's easier to have a workaround in the RDD.cartesian() to add an _reserialize() between chained cartesian (or zipped), it will be less performant, but will be easy to reason about. The current patch may still be wrong in case of chained DartesianDeserializer and PairSerializer, for example, a.cartesian(b.zip(c)) (have not verified yet) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15923#discussion_r90085570 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -56,8 +59,10 @@ final class ShuffleBlockFetcherIterator( shuffleClient: ShuffleClient, --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15923#discussion_r90085604 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator( */ override def next(): (BlockId, InputStream) = { numBlocksProcessed += 1 -val startFetchWait = System.currentTimeMillis() -currentResult = results.take() -val result = currentResult -val stopFetchWait = System.currentTimeMillis() -shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - -result match { - case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => -if (address != blockManager.blockManagerId) { - shuffleMetrics.incRemoteBytesRead(buf.size) - shuffleMetrics.incRemoteBlocksFetched(1) -} -bytesInFlight -= size -if (isNetworkReqDone) { - reqsInFlight -= 1 - logDebug("Number of requests in flight " + reqsInFlight) -} - case _ => -} -// Send fetch requests up to maxBytesInFlight -fetchUpToMaxBytes() -result match { - case FailureFetchResult(blockId, address, e) => -throwFetchFailedException(blockId, address, e) +var result: FetchResult = null +var input: InputStream = null +// Take the next fetched result and try to decompress it to detect data corruption, +// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch +// is also corrupt, so the previous stage could be retried. +// For local shuffle block, throw FailureFetchResult for the first IOException. +while (result == null) { + val startFetchWait = System.currentTimeMillis() + result = results.take() + val stopFetchWait = System.currentTimeMillis() + shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - case SuccessFetchResult(blockId, address, _, buf, _) => -try { - (result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this)) -} catch { - case NonFatal(t) => -throwFetchFailedException(blockId, address, t) -} + result match { +case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) => + if (address != blockManager.blockManagerId) { +shuffleMetrics.incRemoteBytesRead(buf.size) +shuffleMetrics.incRemoteBlocksFetched(1) + } + bytesInFlight -= size + if (isNetworkReqDone) { +reqsInFlight -= 1 +logDebug("Number of requests in flight " + reqsInFlight) + } + + val in = try { +buf.createInputStream() + } catch { +// The exception could only be throwed by local shuffle block +case e: IOException => + assert(buf.isInstanceOf[FileSegmentManagedBuffer]) + logError("Failed to create input stream from local block", e) + buf.release() + throwFetchFailedException(blockId, address, e) + } + + input = streamWrapper(blockId, in) + // Only copy the stream if it's wrapped by compression or encryption, also the size of + // block is small (the decompressed block is smaller than maxBytesInFlight) + if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { +val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) +try { + // Decompress the whole block at once to detect any corruption, which could increase + // the memory usage tne potential increase the chance of OOM. + // TODO: manage the memory used here, and spill it into disk in case of OOM. + Utils.copyStream(input, out) --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15935: [SPARK-18188] add checksum for blocks of broadcast
Github user davies commented on the issue: https://github.com/apache/spark/pull/15935 @zsxwing Added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15935: [SPARK-18188] add checksum for blocks of broadcast
Github user davies commented on the issue: https://github.com/apache/spark/pull/15935 Manually test this patch with a job that usually failed with corrupt streams: ``` 136 26993 0 FAILED PROCESS_LOCAL 0 / 10.1.108.161 2016/11/20 08:39:11 7 s 98 ms 0 ms30.1 MB / 14481129.2 MB java.io.IOException: org.apache.spark.SparkException: corrupt remote block 9: 927759120 != 787315496 +details java.io.IOException: org.apache.spark.SparkException: corrupt remote block 9: 927759120 != 787315496 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1283) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:192) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:93) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.init(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:367) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:364) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: corrupt remote block 9: 927759120 != 787315496 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:154) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:139) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:139) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:139) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:204) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) ... 25 more 136 27059 1 SUCCESS PROCESS_LOCAL 13 / 10.1.109.49 2016/11/20 08:39:23 19 s0.4 s 1 ms30.1 MB / 14481128.9 MB ``` This patch did save a stage from a corrupt block of broadcast. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15923#discussion_r89889964 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator( */ override def next(): (BlockId, InputStream) = { numBlocksProcessed += 1 -val startFetchWait = System.currentTimeMillis() -currentResult = results.take() -val result = currentResult -val stopFetchWait = System.currentTimeMillis() -shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - -result match { - case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => -if (address != blockManager.blockManagerId) { - shuffleMetrics.incRemoteBytesRead(buf.size) - shuffleMetrics.incRemoteBlocksFetched(1) -} -bytesInFlight -= size -if (isNetworkReqDone) { - reqsInFlight -= 1 - logDebug("Number of requests in flight " + reqsInFlight) -} - case _ => -} -// Send fetch requests up to maxBytesInFlight -fetchUpToMaxBytes() -result match { - case FailureFetchResult(blockId, address, e) => -throwFetchFailedException(blockId, address, e) +var result: FetchResult = null +var input: InputStream = null +// Take the next fetched result and try to decompress it to detect data corruption, +// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch +// is also corrupt, so the previous stage could be retried. +// For local shuffle block, throw FailureFetchResult for the first IOException. +while (result == null) { + val startFetchWait = System.currentTimeMillis() + result = results.take() + val stopFetchWait = System.currentTimeMillis() + shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - case SuccessFetchResult(blockId, address, _, buf, _) => -try { - (result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this)) -} catch { - case NonFatal(t) => -throwFetchFailedException(blockId, address, t) -} + result match { +case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) => + if (address != blockManager.blockManagerId) { +shuffleMetrics.incRemoteBytesRead(buf.size) +shuffleMetrics.incRemoteBlocksFetched(1) + } + bytesInFlight -= size + if (isNetworkReqDone) { +reqsInFlight -= 1 +logDebug("Number of requests in flight " + reqsInFlight) + } + + val in = try { +buf.createInputStream() + } catch { +// The exception could only be throwed by local shuffle block +case e: IOException => + assert(buf.isInstanceOf[FileSegmentManagedBuffer]) + logError("Failed to create input stream from local block", e) + buf.release() + throwFetchFailedException(blockId, address, e) + } + + input = streamWrapper(blockId, in) + // Only copy the stream if it's wrapped by compression or encryption, also the size of + // block is small (the decompressed block is smaller than maxBytesInFlight) --- End diff -- @JoshRosen I think so. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15923: [SPARK-4105] retry the fetch or stage if shuffle block i...
Github user davies commented on the issue: https://github.com/apache/spark/pull/15923 Manually test this patch with a job that usually failed because of corrupt stream, as the logging said: ``` 16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_613_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again 16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_688_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again 16/11/20 08:32:07 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_2434_275 from BlockManagerId(6, 10.1.109.163, 34744), fetch again 16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_878_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again 16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_1042_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again 16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_2301_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again 16/11/20 08:32:26 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_2546_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again 16/11/20 08:32:27 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_3160_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again 16/11/20 08:32:27 WARN ShuffleBlockFetcherIterator: got an corrupted block shuffle_5_3601_275 from BlockManagerId(21, 10.1.107.237, 35876), fetch again ... 16/11/20 08:32:41 INFO Executor: Finished task 275.0 in stage 26.0 (TID 22187). 5219 bytes result sent to driver ``` The shuffle fetcher got some corrupt blocks for partition 275, it retried once, then the task finally succeeded. But the retry can not protect all the tasks, some failed as FetchFailed, then the stage is retried: ``` 26 2016/11/20 08:31:24 1.0min 403/1000 (2 failed) 205.6 GB29.5 GB org.apache.spark.shuffle.FetchFailedException: Stream is corrupted 26 (retry 1) 2016/11/20 08:34:00 34s 200/629 (2 failed) 102.0 GB14.6 GB org.apache.spark.shuffle.FetchFailedException: Stream is corrupted 26 (retry 2) 2016/11/20 08:35:25 1.8min 461/461 235.1 GB33.7 GB ``` The stage 26 succeeded after retried twice. Another thing is that all the corruption happened only in 2 nodes out of 26. Also a few broadcast block is corrupt on them. They seems that the corruption happens on the receive (fetcher) side of network. I will update the patch to address comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15980: [SPARK-18528][SQL] Fix a bug to initialise an ite...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15980#discussion_r89365636 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -67,23 +67,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -val stopEarly = ctx.freshName("stopEarly") -ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;") - -ctx.addNewFunction("shouldStop", s""" - @Override - protected boolean shouldStop() { -return !currentRows.isEmpty() || $stopEarly; - } -""") val countTerm = ctx.freshName("count") ctx.addMutableState("int", countTerm, s"$countTerm = 0;") s""" | if ($countTerm < $limit) { | $countTerm += 1; | ${consume(ctx, input)} | } else { - | $stopEarly = true; + | // Do nothing --- End diff -- This removes a optimization, I don't think this is a good fix. Can we initialize it properly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on the issue: https://github.com/apache/spark/pull/13065 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15923#discussion_r88759884 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator( */ override def next(): (BlockId, InputStream) = { numBlocksProcessed += 1 -val startFetchWait = System.currentTimeMillis() -currentResult = results.take() -val result = currentResult -val stopFetchWait = System.currentTimeMillis() -shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - -result match { - case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => -if (address != blockManager.blockManagerId) { - shuffleMetrics.incRemoteBytesRead(buf.size) - shuffleMetrics.incRemoteBlocksFetched(1) -} -bytesInFlight -= size -if (isNetworkReqDone) { - reqsInFlight -= 1 - logDebug("Number of requests in flight " + reqsInFlight) -} - case _ => -} -// Send fetch requests up to maxBytesInFlight -fetchUpToMaxBytes() -result match { - case FailureFetchResult(blockId, address, e) => -throwFetchFailedException(blockId, address, e) +var result: FetchResult = null +var input: InputStream = null +// Take the next fetched result and try to decompress it to detect data corruption, +// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch +// is also corrupt, so the previous stage could be retried. +// For local shuffle block, throw FailureFetchResult for the first IOException. +while (result == null) { + val startFetchWait = System.currentTimeMillis() + result = results.take() + val stopFetchWait = System.currentTimeMillis() + shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - case SuccessFetchResult(blockId, address, _, buf, _) => -try { - (result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this)) -} catch { - case NonFatal(t) => -throwFetchFailedException(blockId, address, t) -} + result match { +case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) => + if (address != blockManager.blockManagerId) { +shuffleMetrics.incRemoteBytesRead(buf.size) +shuffleMetrics.incRemoteBlocksFetched(1) + } + bytesInFlight -= size + if (isNetworkReqDone) { +reqsInFlight -= 1 +logDebug("Number of requests in flight " + reqsInFlight) + } + + val in = try { +buf.createInputStream() + } catch { +// The exception could only be throwed by local shuffle block +case e: IOException => + assert(buf.isInstanceOf[FileSegmentManagedBuffer]) + logError("Failed to create input stream from local block", e) + buf.release() + throwFetchFailedException(blockId, address, e) + } + + input = streamWrapper(blockId, in) + // Only copy the stream if it's wrapped by compression or encryption, also the size of + // block is small (the decompressed block is smaller than maxBytesInFlight) --- End diff -- I tried to add checksum for shuffle blocks in https://github.com/apache/spark/pull/15894, that will have much more complexity and overhead, so in favor of this lighter one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15923#discussion_r88759763 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator( */ override def next(): (BlockId, InputStream) = { numBlocksProcessed += 1 -val startFetchWait = System.currentTimeMillis() -currentResult = results.take() -val result = currentResult -val stopFetchWait = System.currentTimeMillis() -shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - -result match { - case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) => -if (address != blockManager.blockManagerId) { - shuffleMetrics.incRemoteBytesRead(buf.size) - shuffleMetrics.incRemoteBlocksFetched(1) -} -bytesInFlight -= size -if (isNetworkReqDone) { - reqsInFlight -= 1 - logDebug("Number of requests in flight " + reqsInFlight) -} - case _ => -} -// Send fetch requests up to maxBytesInFlight -fetchUpToMaxBytes() -result match { - case FailureFetchResult(blockId, address, e) => -throwFetchFailedException(blockId, address, e) +var result: FetchResult = null +var input: InputStream = null +// Take the next fetched result and try to decompress it to detect data corruption, +// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch +// is also corrupt, so the previous stage could be retried. +// For local shuffle block, throw FailureFetchResult for the first IOException. +while (result == null) { + val startFetchWait = System.currentTimeMillis() + result = results.take() + val stopFetchWait = System.currentTimeMillis() + shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) - case SuccessFetchResult(blockId, address, _, buf, _) => -try { - (result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this)) -} catch { - case NonFatal(t) => -throwFetchFailedException(blockId, address, t) -} + result match { +case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) => + if (address != blockManager.blockManagerId) { +shuffleMetrics.incRemoteBytesRead(buf.size) +shuffleMetrics.incRemoteBlocksFetched(1) + } + bytesInFlight -= size + if (isNetworkReqDone) { +reqsInFlight -= 1 +logDebug("Number of requests in flight " + reqsInFlight) + } + + val in = try { +buf.createInputStream() + } catch { +// The exception could only be throwed by local shuffle block +case e: IOException => + assert(buf.isInstanceOf[FileSegmentManagedBuffer]) + logError("Failed to create input stream from local block", e) + buf.release() + throwFetchFailedException(blockId, address, e) + } + + input = streamWrapper(blockId, in) + // Only copy the stream if it's wrapped by compression or encryption, also the size of + // block is small (the decompressed block is smaller than maxBytesInFlight) --- End diff -- The purpose of this PR is to reduce the possibility that failed job caused by network/disk corruption, without introduce other regression (OOM). Typically, the shuffle blocks are small, so we can have parallel fetching even with this maxBytesInFlight limit. For those few blocks (for example, data skew), we does not check that for now (at least, it's not worse than before). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/13065#discussion_r88755507 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -103,5 +109,192 @@ case class GenerateExec( } } } -} + override def inputRDDs(): Seq[RDD[InternalRow]] = { +child.asInstanceOf[CodegenSupport].inputRDDs() + } + + protected override def doProduce(ctx: CodegenContext): String = { +child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { +ctx.currentVars = input +ctx.copyResult = true + +// Add input rows to the values when we are joining +val values = if (join) { + input +} else { + Seq.empty +} + +boundGenerator match { + case e: CollectionGenerator => codeGenCollection(ctx, e, values, row) + case g => codeGenTraversableOnce(ctx, g, values, row) +} + } + + /** + * Generate code for [[CollectionGenerator]] expressions. + */ + private def codeGenCollection( + ctx: CodegenContext, + e: CollectionGenerator, + input: Seq[ExprCode], + row: ExprCode): String = { + +// Generate code for the generator. +val data = e.genCode(ctx) + +// Generate looping variables. +val index = ctx.freshName("index") + +// Add a check if the generate outer flag is true. +val checks = optionalCode(outer, data.isNull) + +// Add position +val position = if (e.position) { + Seq(ExprCode("", "false", index)) +} else { + Seq.empty +} + +// Generate code for either ArrayData or MapData +val (initMapData, updateRowData, values) = e.collectionType match { + case ArrayType(st: StructType, nullable) if e.inline => +val row = codeGenAccessor(ctx, data.value, "col", index, st, nullable, checks) +val fieldChecks = checks ++ optionalCode(nullable, row.isNull) +val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) => + codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, f.nullable, fieldChecks) +} +("", row.code, columns) + + case ArrayType(dataType, nullable) => +("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, dataType, nullable, checks))) + + case MapType(keyType, valueType, valueContainsNull) => +// Materialize the key and the value arrays before we enter the loop. +val keyArray = ctx.freshName("keyArray") +val valueArray = ctx.freshName("valueArray") +val initArrayData = + s""" + |ArrayData $keyArray = ${data.isNull} ? null : ${data.value}.keyArray(); + |ArrayData $valueArray = ${data.isNull} ? null : ${data.value}.valueArray(); + """.stripMargin +val values = Seq( + codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = false, checks), + codeGenAccessor(ctx, valueArray, "value", index, valueType, valueContainsNull, checks)) +(initArrayData, "", values) +} + +// In case of outer=true we need to make sure the loop is executed at-least once when the +// array/map contains no input. We do this by setting the looping index to -1 if there is no +// input, evaluation of the array is prevented by a check in the accessor code. +val numElements = ctx.freshName("numElements") +val init = if (outer) { + s"$numElements == 0 ? -1 : 0" +} else { + "0" +} +val numOutput = metricTerm(ctx, "numOutputRows") +s""" + |${data.code} + |$initMapData + |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); + |for (int $index = $init; $index < $numElements; $index++) { + | $numOutput.add(1); + | $updateRowData + | ${consume(ctx, input ++ position ++ values)} + |} + """.stripMargin + } + + /** + * Generate code for a regular [[TraversableOnce]] returning [[Generator]]. + */ + private def codeGenTraversableOnce( + ctx: CodegenContext, + e: Expression, + input: Seq[ExprCode], + row: ExprCode): String = { + +// Generate the code for the generator
[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/13065#discussion_r88754155 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -113,4 +117,25 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } + + test("generate should be included in WholeStageCodegen") { +import org.apache.spark.sql.functions._ +val ds = spark.range(2).select( + col("id"), + explode(array(col("id") + 1, col("id") + 2)).as("value")) +val plan = ds.queryExecution.executedPlan +assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[GenerateExec]).isDefined) +assert(ds.collect() === Array(Row(0, 1), Row(0, 2), Row(1, 2), Row(1, 3))) + } + + test("large inline generate should fail in WholeStageCodegen") { --- End diff -- I mean it should not reply on the fallback for failed compiling (whole plan will fallback), but don't use whole stage codegen for this Generate. This could be done by checking it in `Generate.supportCodegen()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15935: [SPARK-] add checksum for blocks of broadcast
GitHub user davies opened a pull request: https://github.com/apache/spark/pull/15935 [SPARK-] add checksum for blocks of broadcast ## What changes were proposed in this pull request? A TorrentBroadcast is serialized and compressed first, then splitted as fixed size blocks, if any block is corrupt when fetching from remote, the decompression/deserialization will fail without knowing which block is corrupt. Also, the corrupt block is kept in block manager and reported to driver, so other tasks (in same executor or from different executor) will also fail because of it. This PR add checksum for each block, and check it after fetching a block from remote executor, because it's very likely that the corruption happen in network. When the corruption happen, it will throw the block away and throw an exception to fail the task, which will be retried. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/davies/spark broadcast_checksum Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15935.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15935 commit 328ca390f10cceeb21656eb4fb9e0ef677fd0672 Author: Davies Liu <dav...@databricks.com> Date: 2016-11-18T22:27:27Z add checksum for blocks of broadcast --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15923: [SPARK-4105] retry the fetch or stage if shuffle block i...
Github user davies commented on the issue: https://github.com/apache/spark/pull/15923 @joshrosen @zsxwing Could you help to review this one ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/13065#discussion_r88581043 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -103,5 +109,182 @@ case class GenerateExec( } } } -} + override def inputRDDs(): Seq[RDD[InternalRow]] = { +child.asInstanceOf[CodegenSupport].inputRDDs() + } + + protected override def doProduce(ctx: CodegenContext): String = { +child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { +ctx.currentVars = input +ctx.copyResult = true + +// Add input rows to the values when we are joining +val values = if (join) { + input +} else { + Seq.empty +} + +boundGenerator match { + case e: CollectionGenerator => codeGenCollection(ctx, e, values, row) + case g => codeGenTraversableOnce(ctx, g, values, row) +} + } + + /** + * Generate code for [[CollectionGenerator]] expressions. + */ + private def codeGenCollection( + ctx: CodegenContext, + e: CollectionGenerator, + input: Seq[ExprCode], + row: ExprCode): String = { + +// Generate code for the generator. +val data = e.genCode(ctx) + +// Generate looping variables. +val index = ctx.freshName("index") + +// Add a check if the generate outer flag is true. +val checks = optionalCode(outer, data.isNull) + +// Add position +val position = if (e.position) { + Seq(ExprCode("", "false", index)) +} else { + Seq.empty +} + +// Generate code for either ArrayData or MapData +val (initMapData, updateRowData, values) = e.collectionType match { + case ArrayType(st: StructType, nullable) if e.inline => +val row = codeGenAccessor(ctx, data.value, "col", index, st, nullable, checks) +val fieldChecks = checks ++ optionalCode(nullable, row.isNull) +val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) => + codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, f.nullable, fieldChecks) +} +("", row.code, columns) + + case ArrayType(dataType, nullable) => +("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, dataType, nullable, checks))) + + case MapType(keyType, valueType, valueContainsNull) => +// Materialize the key and the value arrays before we enter the loop. +val keyArray = ctx.freshName("keyArray") +val valueArray = ctx.freshName("valueArray") +val initArrayData = + s""" + |ArrayData $keyArray = ${data.isNull} ? null : ${data.value}.keyArray(); + |ArrayData $valueArray = ${data.isNull} ? null : ${data.value}.valueArray(); + """.stripMargin +val values = Seq( + codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = false, checks), + codeGenAccessor(ctx, valueArray, "value", index, valueType, valueContainsNull, checks)) +(initArrayData, "", values) +} + +// In case of outer=true we need to make sure the loop is executed at-least once when the +// array/map contains no input. We do this by setting the looping index to -1 if there is no +// input, evaluation of the array is prevented by a check in the accessor code. +val numElements = ctx.freshName("numElements") +val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0" +val numOutput = metricTerm(ctx, "numOutputRows") +s""" + |${data.code} + |$initMapData + |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); + |for (int $index = $init; $index < $numElements; $index++) { + | $numOutput.add(1); + | $updateRowData + | ${consume(ctx, input ++ position ++ values)} + |} + """.stripMargin + } + + /** + * Generate code for a regular [[TraversableOnce]] returning [[Generator]]. + */ + private def codeGenTraversableOnce( + ctx: CodegenContext, + e: Expression, + input: Seq[ExprCode], + row: ExprCode): String = { + +// Generate the code for the generator +val data = e.genCode(ctx) + +// Gene
[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/13065#discussion_r88580906 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -113,4 +117,25 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined) assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } + + test("generate should be included in WholeStageCodegen") { +import org.apache.spark.sql.functions._ +val ds = spark.range(2).select( + col("id"), + explode(array(col("id") + 1, col("id") + 2)).as("value")) +val plan = ds.queryExecution.executedPlan +assert(plan.find(p => + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[GenerateExec]).isDefined) +assert(ds.collect() === Array(Row(0, 1), Row(0, 2), Row(1, 2), Row(1, 3))) + } + + test("large inline generate should fail in WholeStageCodegen") { --- End diff -- Should this still work? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15894: [SPARK-18188] Add checksum for shuffle blocks
Github user davies commented on the issue: https://github.com/apache/spark/pull/15894 Due to complexity and overhead here, close it in favor of https://github.com/apache/spark/pull/15923/. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15894: [SPARK-18188] Add checksum for shuffle blocks
Github user davies closed the pull request at: https://github.com/apache/spark/pull/15894 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...
GitHub user davies opened a pull request: https://github.com/apache/spark/pull/15923 [SPARK-4105] retry the fetch or stage if shuffle block is corrupt ## What changes were proposed in this pull request? There is an outstanding issue that existed for a long time: Sometimes the shuffle blocks are corrupt and can't be decompressed. We recently hit this in three different workloads, sometimes we can reproduce it by every try, sometimes can't. I also found that when the corruption happened, the beginning and end of the blocks are correct, the corruption happen in the middle. There was one case that the string of block id is corrupt by one character. It seems that it's very likely the corruption is introduced by some weird machine/hardware, also the checksum (16 bits) in TCP is not strong enough to identify all the corruption. Unfortunately, Spark does not have checksum for shuffle blocks or broadcast, the job will fail if any corruption happen in the shuffle block from disk, or broadcast blocks during network. This PR try to detect the corruption after fetching shuffle blocks by decompressing them, because most of the compression already have checksum in them. It will retry the block, or failed with FetchFailure, so the previous stage could be retried on different (still random) machines. ## How was this patch tested? TBD You can merge this pull request into a Git repository by running: $ git pull https://github.com/davies/spark detect_corrupt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15923.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15923 commit 5c93aafd00b6ba50e8211227272aac6041fee376 Author: Davies Liu <dav...@databricks.com> Date: 2016-11-18T00:04:40Z retry the fetch or stage if shuffle block is corrupt --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/13065#discussion_r88550711 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -40,6 +42,10 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional * programming with one important additional feature, which allows the input rows to be joined with * their output. + * + * This operator supports whole stage code generation for generators that do not implement + * terminate(). --- End diff -- I see, those Generator with terminate implements CodegenFallback, then GenerateExec will also fallback, nvm. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/13065#discussion_r88525329 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -149,29 +167,52 @@ case class Stack(children: Seq[Expression]) InternalRow(fields: _*) } } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +// Rows - we write these into an array. +val rowData = ctx.freshName("rows") +ctx.addMutableState("InternalRow[]", rowData, s"this.$rowData = new InternalRow[$numRows];") +val values = children.tail +val dataTypes = values.take(numFields).map(_.dataType) +val code = ctx.splitExpressions(ctx.INPUT_ROW, Seq.tabulate(numRows) { row => --- End diff -- Unfortunately, splitExpressions does not work with whole stage codegen, we have to fallback if there are many rows. Can you add a test for that (many rows)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/13065#discussion_r88524027 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -40,6 +42,10 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional * programming with one important additional feature, which allows the input rows to be joined with * their output. + * + * This operator supports whole stage code generation for generators that do not implement + * terminate(). --- End diff -- Could be done in ``` override def supportCodegen(): Boolean = { generator.terminate().isEmpty } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13065: [SPARK-15214][SQL] Code-generation for Generate
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/13065#discussion_r88523492 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -103,5 +109,182 @@ case class GenerateExec( } } } -} + override def inputRDDs(): Seq[RDD[InternalRow]] = { +child.asInstanceOf[CodegenSupport].inputRDDs() + } + + protected override def doProduce(ctx: CodegenContext): String = { +child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { +ctx.currentVars = input +ctx.copyResult = true + +// Add input rows to the values when we are joining +val values = if (join) { + input +} else { + Seq.empty +} + +boundGenerator match { + case e: CollectionGenerator => codeGenCollection(ctx, e, values, row) + case g => codeGenTraversableOnce(ctx, g, values, row) +} + } + + /** + * Generate code for [[CollectionGenerator]] expressions. + */ + private def codeGenCollection( + ctx: CodegenContext, + e: CollectionGenerator, + input: Seq[ExprCode], + row: ExprCode): String = { + +// Generate code for the generator. +val data = e.genCode(ctx) + +// Generate looping variables. +val index = ctx.freshName("index") + +// Add a check if the generate outer flag is true. +val checks = optionalCode(outer, data.isNull) + +// Add position +val position = if (e.position) { + Seq(ExprCode("", "false", index)) +} else { + Seq.empty +} + +// Generate code for either ArrayData or MapData +val (initMapData, updateRowData, values) = e.collectionType match { + case ArrayType(st: StructType, nullable) if e.inline => +val row = codeGenAccessor(ctx, data.value, "col", index, st, nullable, checks) +val fieldChecks = checks ++ optionalCode(nullable, row.isNull) +val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) => + codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, f.nullable, fieldChecks) +} +("", row.code, columns) + + case ArrayType(dataType, nullable) => +("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, dataType, nullable, checks))) + + case MapType(keyType, valueType, valueContainsNull) => +// Materialize the key and the value arrays before we enter the loop. +val keyArray = ctx.freshName("keyArray") +val valueArray = ctx.freshName("valueArray") +val initArrayData = + s""" + |ArrayData $keyArray = ${data.isNull} ? null : ${data.value}.keyArray(); + |ArrayData $valueArray = ${data.isNull} ? null : ${data.value}.valueArray(); + """.stripMargin +val values = Seq( + codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = false, checks), + codeGenAccessor(ctx, valueArray, "value", index, valueType, valueContainsNull, checks)) +(initArrayData, "", values) +} + +// In case of outer=true we need to make sure the loop is executed at-least once when the +// array/map contains no input. We do this by setting the looping index to -1 if there is no +// input, evaluation of the array is prevented by a check in the accessor code. +val numElements = ctx.freshName("numElements") +val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0" +val numOutput = metricTerm(ctx, "numOutputRows") +s""" + |${data.code} + |$initMapData + |int $numElements = ${data.isNull} ? 0 : ${data.value}.numElements(); + |for (int $index = $init; $index < $numElements; $index++) { + | $numOutput.add(1); + | $updateRowData + | ${consume(ctx, input ++ position ++ values)} + |} + """.stripMargin + } + + /** + * Generate code for a regular [[TraversableOnce]] returning [[Generator]]. + */ + private def codeGenTraversableOnce( + ctx: CodegenContext, + e: Expression, + input: Seq[ExprCode], + row: ExprCode): String = { + +// Generate the code for the generator +val data = e.genCode(ctx) + +// Gene