spark git commit: [SPARK-25159][SQL] json schema inference should only trigger one job

2018-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 07737c87d -> 4a9c9d8f9


[SPARK-25159][SQL] json schema inference should only trigger one job

## What changes were proposed in this pull request?

This fixes a perf regression caused by 
https://github.com/apache/spark/pull/21376 .

We should not use `RDD#toLocalIterator`, which triggers one Spark job per RDD 
partition. This is very bad for RDDs with a lot of small partitions.

To fix it, this PR introduces a way to access SQLConf in the scheduler event 
loop thread, so that we don't need to use `RDD#toLocalIterator` anymore in 
`JsonInferSchema`.

## How was this patch tested?

a new test

Closes #22152 from cloud-fan/conf.

Authored-by: Wenchen Fan 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a9c9d8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a9c9d8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a9c9d8f

Branch: refs/heads/master
Commit: 4a9c9d8f9a8f8f165369e121d3b553a3515333d4
Parents: 07737c8
Author: Wenchen Fan 
Authored: Tue Aug 21 22:21:08 2018 -0700
Committer: Xiao Li 
Committed: Tue Aug 21 22:21:08 2018 -0700

--
 .../sql/catalyst/json/JsonInferSchema.scala | 16 +++---
 .../org/apache/spark/sql/internal/SQLConf.scala | 33 
 .../org/apache/spark/sql/DataFrameSuite.scala   | 24 ++
 3 files changed, 63 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 5f70e06..a00 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, 
ParseMode, PermissiveMode}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
   }.reduceOption(typeMerger).toIterator
 }
 
-// Here we get RDD local iterator then fold, instead of calling `RDD.fold` 
directly, because
-// `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-// active SparkSession and `SQLConf.get` may point to the wrong configs.
-val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+// Here we manually submit a fold-like Spark job, so that we can set the 
SQLConf when running
+// the fold functions in the scheduler event loop thread.
+val existingConf = SQLConf.get
+var rootType: DataType = StructType(Nil)
+val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+val mergeResult = (index: Int, taskResult: DataType) => {
+  rootType = SQLConf.withExistingConf(existingConf) {
+typeMerger(rootType, taskResult)
+  }
+}
+json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, 
mergeResult)
 
 canonicalizeType(rootType, configOptions) match {
   case Some(st: StructType) => st

http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5913c94..df2caff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -82,6 +82,19 @@ object SQLConf {
   /** See [[get]] for more information. */
   def getFallbackConf: SQLConf = fallbackConf.get()
 
+  private lazy val existingConf = new ThreadLocal[SQLConf] {
+override def initialValue: SQLConf = null
+  }
+
+  def withExistingConf[T](conf: SQLConf)(f: => T): T = {
+existingConf.set(conf)
+try {
+  f
+} finally {
+  existingConf.remove()
+}
+  }
+
   /**
* Defines a getter that returns the SQLConf within scope.
* See [[get]] for more information.
@@ -116,16 +129,24 @@ object SQLConf {
 if (TaskContext.get != null) {
   new 

spark git commit: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions when expr codegen fails

2018-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master a998e9d82 -> 07737c87d


[SPARK-23711][SPARK-25140][SQL] Catch correct exceptions when expr codegen fails

## What changes were proposed in this pull request?
This pr is to fix bugs when expr codegen fails; we need to catch 
`java.util.concurrent.ExecutionException` instead of 
`InternalCompilerException` and `CompileException` . This handling is the same 
with the `WholeStageCodegenExec ` one: 
https://github.com/apache/spark/blob/60af2501e1afc00192c779f2736a4e3de12428fa/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L585

## How was this patch tested?
Added tests in `CodeGeneratorWithInterpretedFallbackSuite`

Closes #22154 from maropu/SPARK-25140.

Authored-by: Takeshi Yamamuro 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07737c87
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07737c87
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07737c87

Branch: refs/heads/master
Commit: 07737c87d6086c986785ff0edc43ca94effa4fc6
Parents: a998e9d
Author: Takeshi Yamamuro 
Authored: Tue Aug 21 22:17:44 2018 -0700
Committer: Xiao Li 
Committed: Tue Aug 21 22:17:44 2018 -0700

--
 .../CodeGeneratorWithInterpretedFallback.scala  | 23 --
 .../sql/catalyst/expressions/Projection.scala   |  7 +++-
 ...eGeneratorWithInterpretedFallbackSuite.scala | 44 ++--
 .../sql/execution/WholeStageCodegenExec.scala   |  3 +-
 4 files changed, 55 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07737c87/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
index 0f6d866..07fa813 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
@@ -17,25 +17,13 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.codehaus.commons.compiler.CompileException
-import org.codehaus.janino.InternalCompilerException
+import scala.util.control.NonFatal
 
-import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
 
 /**
- * Catches compile error during code generation.
- */
-object CodegenError {
-  def unapply(throwable: Throwable): Option[Exception] = throwable match {
-case e: InternalCompilerException => Some(e)
-case e: CompileException => Some(e)
-case _ => None
-  }
-}
-
-/**
  * Defines values for `SQLConf` config of fallback mode. Use for test only.
  */
 object CodegenObjectFactoryMode extends Enumeration {
@@ -47,7 +35,7 @@ object CodegenObjectFactoryMode extends Enumeration {
  * error happens, it can fallback to interpreted implementation. In tests, we 
can use a SQL config
  * `SQLConf.CODEGEN_FACTORY_MODE` to control fallback behavior.
  */
-abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] {
+abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging {
 
   def createObject(in: IN): OUT = {
 // We are allowed to choose codegen-only or no-codegen modes if under 
tests.
@@ -63,7 +51,10 @@ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] 
{
 try {
   createCodeGeneratedObject(in)
 } catch {
-  case CodegenError(_) => createInterpretedObject(in)
+  case NonFatal(_) =>
+// We should have already seen the error message in `CodeGenerator`
+logWarning("Expr codegen error and falling back to interpreter 
mode")
+createInterpretedObject(in)
 }
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/07737c87/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 6493f09..226a4dd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 

svn commit: r28889 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_21_20_02-a998e9d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-21 Thread pwendell
Author: pwendell
Date: Wed Aug 22 03:15:59 2018
New Revision: 28889

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_21_20_02-a998e9d docs


[This commit notification would consist of 1476 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] Added import to fix compilation

2018-08-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master ad45299d0 -> a998e9d82


[MINOR] Added import to fix compilation

## What changes were proposed in this pull request?

Two back to PRs implicitly conflicted by one PR removing an existing import 
that the other PR needed. This did not cause explicit conflict as the import 
already existed, but not used.

https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.7/8226/consoleFull

```
[info] Compiling 342 Scala sources and 97 Java sources to 
/home/jenkins/workspace/spark-master-compile-maven-hadoop-2.7/sql/core/target/scala-2.11/classes...
[warn] 
/home/jenkins/workspace/spark-master-compile-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:128:
 value ENABLE_JOB_SUMMARY in object ParquetOutputFormat is deprecated: see 
corresponding Javadoc for more information.
[warn]   && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) {
[warn]   ^
[error] 
/home/jenkins/workspace/spark-master-compile-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:95:
 value asJava is not a member of scala.collection.immutable.Map[String,Long]
[error]   new java.util.HashMap(customMetrics.mapValues(long2Long).asJava)
[error]^
[warn] one warning found
[error] one error found
[error] Compile failed at Aug 21, 2018 4:04:35 PM [12.827s]
```

## How was this patch tested?
It compiles!

Closes #22175 from tdas/fix-build.

Authored-by: Tathagata Das 
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a998e9d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a998e9d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a998e9d8

Branch: refs/heads/master
Commit: a998e9d829bd499dd7c65f973ea4389e0401b001
Parents: ad45299
Author: Tathagata Das 
Authored: Tue Aug 21 17:08:15 2018 -0700
Committer: Tathagata Das 
Committed: Tue Aug 21 17:08:15 2018 -0700

--
 .../apache/spark/sql/execution/streaming/statefulOperators.scala   | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a998e9d8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 7351db8..c11af34 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming
 import java.util.UUID
 import java.util.concurrent.TimeUnit._
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25095][PYSPARK] Python support for BarrierTaskContext

2018-08-21 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 42035a4fe -> ad45299d0


[SPARK-25095][PYSPARK] Python support for BarrierTaskContext

## What changes were proposed in this pull request?

Add method `barrier()` and `getTaskInfos()` in python TaskContext, these two 
methods are only allowed for barrier tasks.

## How was this patch tested?

Add new tests in `tests.py`

Closes #22085 from jiangxb1987/python.barrier.

Authored-by: Xingbo Jiang 
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad45299d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad45299d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad45299d

Branch: refs/heads/master
Commit: ad45299d047c10472fd3a86103930fe7c54a4cf1
Parents: 42035a4
Author: Xingbo Jiang 
Authored: Tue Aug 21 15:54:30 2018 -0700
Committer: Xiangrui Meng 
Committed: Tue Aug 21 15:54:30 2018 -0700

--
 .../apache/spark/api/python/PythonRunner.scala  | 106 ++
 python/pyspark/serializers.py   |   7 +
 python/pyspark/taskcontext.py   | 144 +++
 python/pyspark/tests.py |  36 -
 python/pyspark/worker.py|  16 ++-
 5 files changed, 305 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad45299d/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 7b31857..f824191 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -20,12 +20,14 @@ package org.apache.spark.api.python
 import java.io._
 import java.net._
 import java.nio.charset.StandardCharsets
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
+import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util._
 
 
@@ -76,6 +78,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   // TODO: support accumulator in multiple UDF
   protected val accumulator = funcs.head.funcs.head.accumulator
 
+  // Expose a ServerSocket to support method calls via socket from Python side.
+  private[spark] var serverSocket: Option[ServerSocket] = None
+
+  // Authentication helper used when serving method calls via socket from 
Python side.
+  private lazy val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
+
   def compute(
   inputIterator: Iterator[IN],
   partitionIndex: Int,
@@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(/* port */ 0,
+/* backlog */ 1,
+InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  // Wait for function call from python side.
+  sock.setSoTimeout(1)
+  val input = new DataInputStream(sock.getInputStream())
+  input.readInt() match {
+case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION =>
+  // The barrier() function may wait infinitely, socket 
shall not timeout
+  // before the function finishes.
+  sock.setSoTimeout(0)
+  barrierAndServe(sock)
+
+case _ =>
+  val out = new DataOutputStream(new BufferedOutputStream(
+sock.getOutputStream))
+  
writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out)
+  }
+} catch {
+  case e: SocketException if e.getMessage.contains("Socket 
closed") =>
+// It is possible that the ServerSocket is not closed, but 
the native socket
+  

spark git commit: [SPARK-24441][SS] Expose total estimated size of states in HDFSBackedStateStoreProvider

2018-08-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master ac0174e55 -> 42035a4fe


[SPARK-24441][SS] Expose total estimated size of states in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch exposes the estimation of size of cache (loadedMaps) in 
HDFSBackedStateStoreProvider as a custom metric of StateStore.

The rationalize of the patch is that state backed by 
HDFSBackedStateStoreProvider will consume more memory than the number what we 
can get from query status due to caching multiple versions of states. The 
memory footprint to be much larger than query status reports in situations 
where the state store is getting a lot of updates: while shallow-copying map 
incurs additional small memory usages due to the size of map entities and 
references, but row objects will still be shared across the versions. If 
there're lots of updates between batches, less row objects will be shared and 
more row objects will exist in memory consuming much memory then what we expect.

While HDFSBackedStateStore refers loadedMaps in HDFSBackedStateStoreProvider 
directly, there would be only one `StateStoreWriter` which refers a 
StateStoreProvider, so the value is not exposed as well as being aggregated 
multiple times. Current state metrics are safe to aggregate for the same reason.

## How was this patch tested?

Tested manually. Below is the snapshot of UI page which is reflected by the 
patch:

https://user-images.githubusercontent.com/1317309/40978481-b46ad324-690e-11e8-9b0f-e80528612a62.png;>

Please refer "estimated size of states cache in provider total" as well as 
"count of versions in state cache in provider".

Closes #21469 from HeartSaVioR/SPARK-24441.

Authored-by: Jungtaek Lim 
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42035a4f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42035a4f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42035a4f

Branch: refs/heads/master
Commit: 42035a4fec6eb216427486b5067a45fceb65cc2d
Parents: ac0174e
Author: Jungtaek Lim 
Authored: Tue Aug 21 15:28:31 2018 -0700
Committer: Tathagata Das 
Committed: Tue Aug 21 15:28:31 2018 -0700

--
 .../state/HDFSBackedStateStoreProvider.scala|  39 +++-
 .../execution/streaming/state/StateStore.scala  |   2 +
 .../state/SymmetricHashJoinStateManager.scala   |   2 +
 .../execution/streaming/statefulOperators.scala |  12 ++-
 .../apache/spark/sql/streaming/progress.scala   |  15 ++-
 .../streaming/state/StateStoreSuite.scala   | 100 +++
 .../streaming/StreamingQueryListenerSuite.scala |   2 +-
 .../StreamingQueryStatusAndProgressSuite.scala  |  13 ++-
 8 files changed, 176 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 523acef..92a2480 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.util
 import java.util.Locale
+import java.util.concurrent.atomic.LongAdder
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -165,7 +166,16 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
 }
 
 override def metrics: StateStoreMetrics = {
-  StateStoreMetrics(mapToUpdate.size(), 
SizeEstimator.estimate(mapToUpdate), Map.empty)
+  // NOTE: we provide estimation of cache size as "memoryUsedBytes", and 
size of state for
+  // current version as "stateOnCurrentVersionSizeBytes"
+  val metricsFromProvider: Map[String, Long] = getMetricsForProvider()
+
+  val customMetrics = metricsFromProvider.flatMap { case (name, value) =>
+// just allow searching from list cause the list is small enough
+supportedCustomMetrics.find(_.name == name).map(_ -> value)
+  } + (metricStateOnCurrentVersionSizeBytes -> 
SizeEstimator.estimate(mapToUpdate))
+
+  StateStoreMetrics(mapToUpdate.size(), 
metricsFromProvider("memoryUsedBytes"), customMetrics)
 }
 
 /**
@@ -180,6 +190,12 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
 }
   }
 
+  def 

spark git commit: [SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable

2018-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 6c5cb8585 -> ac0174e55


[SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in 
module configurable

## What changes were proposed in this pull request?

In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider 
com.databricks.spark.avro is mapped to the new package 
org.apache.spark.sql.avro .

As per the discussion in the 
[Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we 
should make the mapping configurable.

This PR also improve the error message when data source of Avro/Kafka is not 
found.

## How was this patch tested?

Unit test

Closes #22133 from gengliangwang/configurable_avro_mapping.

Authored-by: Gengliang Wang 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac0174e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac0174e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac0174e5

Branch: refs/heads/master
Commit: ac0174e55af2e935d41545721e9f430c942b3a0c
Parents: 6c5cb85
Author: Gengliang Wang 
Authored: Tue Aug 21 15:26:24 2018 -0700
Committer: Xiao Li 
Committed: Tue Aug 21 15:26:24 2018 -0700

--
 .../org/apache/spark/sql/avro/AvroSuite.scala | 11 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala   | 10 ++
 .../sql/execution/datasources/DataSource.scala| 16 ++--
 .../sql/sources/ResolvedDataSourceSuite.scala | 18 ++
 4 files changed, 52 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
--
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index c4f4d8e..72bef9e 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -77,10 +77,19 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
   }
 
   test("resolve avro data source") {
-Seq("avro", "com.databricks.spark.avro").foreach { provider =>
+val databricksAvro = "com.databricks.spark.avro"
+// By default the backward compatibility for com.databricks.spark.avro is 
enabled.
+Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", 
databricksAvro).foreach { provider =>
   assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) ===
 classOf[org.apache.spark.sql.avro.AvroFileFormat])
 }
+
+withSQLConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key -> 
"false") {
+  val message = intercept[AnalysisException] {
+DataSource.lookupDataSource(databricksAvro, spark.sessionState.conf)
+  }.getMessage
+  assert(message.contains(s"Failed to find data source: $databricksAvro"))
+}
   }
 
   test("reading from multiple paths") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b44bfe7..5913c94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1469,6 +1469,13 @@ object SQLConf {
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
 
+  val LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED =
+buildConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled")
+  .doc("If it is set to true, the data source provider 
com.databricks.spark.avro is mapped " +
+"to the built-in but external Avro data source module for backward 
compatibility.")
+  .booleanConf
+  .createWithDefault(true)
+
   val LEGACY_SETOPS_PRECEDENCE_ENABLED =
 buildConf("spark.sql.legacy.setopsPrecedence.enabled")
   .internal()
@@ -1881,6 +1888,9 @@ class SQLConf extends Serializable with Logging {
 
   def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
 
+  def replaceDatabricksSparkAvroEnabled: Boolean =
+getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)
+
   def setOpsPrecedenceEnforced: Boolean = 
getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)
 
   def parallelFileListingInStatsComputation: Boolean =


spark git commit: [SPARK-24763][SS] Remove redundant key data from value in streaming aggregation

2018-08-21 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 72ecfd095 -> 6c5cb8585


[SPARK-24763][SS] Remove redundant key data from value in streaming aggregation

## What changes were proposed in this pull request?

This patch proposes a new flag option for stateful aggregation: remove 
redundant key data from value.
Enabling new option runs similar with current, and uses less memory for state 
according to key/value fields of state operator.

Please refer below link to see detailed perf. test result:
https://issues.apache.org/jira/browse/SPARK-24763?focusedCommentId=16536539=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16536539

Since the state between enabling the option and disabling the option is not 
compatible, the option is set to 'disable' by default (to ensure backward 
compatibility), and OffsetSeqMetadata would prevent modifying the option after 
executing query.

## How was this patch tested?

Modify unit tests to cover both disabling option and enabling option.
Also did manual tests to see whether propose patch improves state memory usage.

Closes #21733 from HeartSaVioR/SPARK-24763.

Authored-by: Jungtaek Lim 
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c5cb858
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c5cb858
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c5cb858

Branch: refs/heads/master
Commit: 6c5cb85856235efd464b109558896f81ae2c4c75
Parents: 72ecfd0
Author: Jungtaek Lim 
Authored: Tue Aug 21 15:22:42 2018 -0700
Committer: Tathagata Das 
Committed: Tue Aug 21 15:22:42 2018 -0700

--
 .../org/apache/spark/sql/internal/SQLConf.scala |  10 +
 .../spark/sql/execution/SparkStrategies.scala   |   3 +
 .../sql/execution/aggregate/AggUtils.scala  |   5 +-
 .../streaming/IncrementalExecution.scala|   6 +-
 .../sql/execution/streaming/OffsetSeq.scala |   8 +-
 .../StreamingAggregationStateManager.scala  | 205 +++
 .../execution/streaming/statefulOperators.scala |  61 +++---
 .../commits/0   |   2 +
 .../commits/1   |   2 +
 .../metadata|   1 +
 .../offsets/0   |   3 +
 .../offsets/1   |   3 +
 .../state/0/0/1.delta   | Bin 0 -> 46 bytes
 .../state/0/0/2.delta   | Bin 0 -> 46 bytes
 .../state/0/1/1.delta   | Bin 0 -> 77 bytes
 .../state/0/1/2.delta   | Bin 0 -> 77 bytes
 .../state/0/2/1.delta   | Bin 0 -> 46 bytes
 .../state/0/2/2.delta   | Bin 0 -> 46 bytes
 .../state/0/3/1.delta   | Bin 0 -> 46 bytes
 .../state/0/3/2.delta   | Bin 0 -> 46 bytes
 .../state/0/4/1.delta   | Bin 0 -> 46 bytes
 .../state/0/4/2.delta   | Bin 0 -> 77 bytes
 .../streaming/state/MemoryStateStore.scala  |  49 +
 .../StreamingAggregationStateManagerSuite.scala | 126 
 .../streaming/FlatMapGroupsWithStateSuite.scala |  24 +--
 .../streaming/StreamingAggregationSuite.scala   | 150 +++---
 26 files changed, 573 insertions(+), 85 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6c5cb858/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index bffdddc..b44bfe7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -888,6 +888,16 @@ object SQLConf {
 .intConf
 .createWithDefault(2)
 
+  val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
+buildConf("spark.sql.streaming.aggregation.stateFormatVersion")
+  .internal()
+  .doc("State format version used by streaming aggregation operations in a 
streaming query. " +
+"State between versions are tend to be incompatible, so state format 
version shouldn't " +
+"be modified after running.")
+  .intConf
+  .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
+  .createWithDefault(2)
+
   val UNSUPPORTED_OPERATION_CHECK_ENABLED =
 buildConf("spark.sql.streaming.unsupportedOperationCheck")
   .internal()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c5cb858/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

spark git commit: [SPARK-25149][GRAPHX] Update Parallel Personalized Page Rank to test with large vertexIds

2018-08-21 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 99d2e4e00 -> 72ecfd095


[SPARK-25149][GRAPHX] Update Parallel Personalized Page Rank to test with large 
vertexIds

## What changes were proposed in this pull request?

runParallelPersonalizedPageRank in graphx checks that `sources` are <= 
Int.MaxValue.toLong, but this is not actually required. This check seems to 
have been added because we use sparse vectors in the implementation and sparse 
vectors cannot be indexed by values > MAX_INT. However we do not ever index the 
sparse vector by the source vertexIds so this isn't an issue. I've added a test 
with large vertexIds to confirm this works as expected.

## How was this patch tested?

Unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22139 from MrBago/remove-veretexId-check-pppr.

Authored-by: Bago Amirbekian 
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72ecfd09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72ecfd09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72ecfd09

Branch: refs/heads/master
Commit: 72ecfd095062ad61c073f9b97bf3c47644575d60
Parents: 99d2e4e
Author: Bago Amirbekian 
Authored: Tue Aug 21 15:21:55 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Aug 21 15:21:55 2018 -0700

--
 .../org/apache/spark/graphx/lib/PageRank.scala  | 28 ++---
 .../apache/spark/graphx/lib/PageRankSuite.scala | 32 +++-
 2 files changed, 35 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72ecfd09/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
--
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index ebd65e8..96b635f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -184,9 +184,11 @@ object PageRank extends Logging {
* indexed by the position of nodes in the sources list) and
* edge attributes the normalized edge weight
*/
-  def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED],
-numIter: Int, resetProb: Double = 0.15,
-sources: Array[VertexId]): Graph[Vector, Double] = {
+  def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
+  graph: Graph[VD, ED],
+  numIter: Int,
+  resetProb: Double = 0.15,
+  sources: Array[VertexId]): Graph[Vector, Double] = {
 require(numIter > 0, s"Number of iterations must be greater than 0," +
   s" but got ${numIter}")
 require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must 
belong" +
@@ -194,15 +196,11 @@ object PageRank extends Logging {
 require(sources.nonEmpty, s"The list of sources must be non-empty," +
   s" but got ${sources.mkString("[", ",", "]")}")
 
-// TODO if one sources vertex id is outside of the int range
-// we won't be able to store its activations in a sparse vector
-require(sources.max <= Int.MaxValue.toLong,
-  s"This implementation currently only works for source vertex ids at most 
${Int.MaxValue}")
 val zero = Vectors.sparse(sources.size, List()).asBreeze
-val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
-  val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
-  (vid, v)
-}.toMap
+// map of vid -> vector where for each vid, the _position of vid in 
source_ is set to 1.0
+val sourcesInitMap = sources.zipWithIndex.toMap.mapValues { i =>
+  Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
+}
 val sc = graph.vertices.sparkContext
 val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
 // Initialize the PageRank graph with each edge attribute having
@@ -212,13 +210,7 @@ object PageRank extends Logging {
   .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => 
deg.getOrElse(0) }
   // Set the weight on the edges based on the degree
   .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
-  .mapVertices { (vid, attr) =>
-if (sourcesInitMapBC.value contains vid) {
-  sourcesInitMapBC.value(vid)
-} else {
-  zero
-}
-  }
+  .mapVertices((vid, _) => sourcesInitMapBC.value.getOrElse(vid, zero))
 
 var i = 0
 while (i < numIter) {

http://git-wip-us.apache.org/repos/asf/spark/blob/72ecfd09/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
--
diff --git 

svn commit: r28887 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_21_12_02-99d2e4e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-21 Thread pwendell
Author: pwendell
Date: Tue Aug 21 19:16:17 2018
New Revision: 28887

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_21_12_02-99d2e4e docs


[This commit notification would consist of 1476 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24296][CORE] Replicate large blocks as a stream.

2018-08-21 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 35f7f5ce8 -> 99d2e4e00


[SPARK-24296][CORE] Replicate large blocks as a stream.

When replicating large cached RDD blocks, it can be helpful to replicate
them as a stream, to avoid using large amounts of memory during the
transfer.  This also allows blocks larger than 2GB to be replicated.

Added unit tests in DistributedSuite.  Also ran tests on a cluster for
blocks > 2gb.

Closes #21451 from squito/clean_replication.

Authored-by: Imran Rashid 
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99d2e4e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99d2e4e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99d2e4e0

Branch: refs/heads/master
Commit: 99d2e4e00711cffbfaee8cb3da9b6b3feab8ff18
Parents: 35f7f5c
Author: Imran Rashid 
Authored: Tue Aug 21 11:26:41 2018 -0700
Committer: Marcelo Vanzin 
Committed: Tue Aug 21 11:26:41 2018 -0700

--
 .../network/server/TransportRequestHandler.java |  2 +-
 .../shuffle/protocol/BlockTransferMessage.java  |  3 +-
 .../shuffle/protocol/UploadBlockStream.java | 89 
 .../org/apache/spark/executor/Executor.scala|  4 +-
 .../apache/spark/internal/config/package.scala  |  7 ++
 .../apache/spark/network/BlockDataManager.scala | 12 +++
 .../network/netty/NettyBlockRpcServer.scala | 26 +-
 .../netty/NettyBlockTransferService.scala   | 39 +
 .../org/apache/spark/storage/BlockManager.scala | 66 ++-
 .../storage/BlockManagerManagedBuffer.scala |  7 +-
 .../org/apache/spark/storage/DiskStore.scala|  5 +-
 .../spark/util/io/ChunkedByteBuffer.scala   |  4 +
 .../org/apache/spark/DistributedSuite.scala | 25 +-
 .../spark/security/EncryptionFunSuite.scala | 12 ++-
 .../apache/spark/storage/DiskStoreSuite.scala   |  3 +-
 project/MimaExcludes.scala  |  3 +-
 16 files changed, 270 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index c6fd56b..9fac96d 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -234,7 +234,7 @@ public class TransportRequestHandler extends 
MessageHandler {
  callback.onSuccess(ByteBuffer.allocate(0));
} catch (Exception ex) {
  IOException ioExc = new IOException("Failure post-processing 
complete stream;" +
-   " failing this rpc and leaving channel active");
+   " failing this rpc and leaving channel active", ex);
  callback.onFailure(ioExc);
  streamHandler.onFailure(streamId, ioExc);
}

http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
--
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 9af6759..a68a297 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -42,7 +42,7 @@ public abstract class BlockTransferMessage implements 
Encodable {
   /** Preceding every serialized message is its type, which allows us to 
deserialize it. */
   public enum Type {
 OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), 
REGISTER_DRIVER(4),
-HEARTBEAT(5);
+HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6);
 
 private final byte id;
 
@@ -67,6 +67,7 @@ public abstract class BlockTransferMessage implements 
Encodable {
 case 3: return StreamHandle.decode(buf);
 case 4: return RegisterDriver.decode(buf);
 case 5: return ShuffleServiceHeartbeat.decode(buf);
+case 6: return UploadBlockStream.decode(buf);
 default: throw new IllegalArgumentException("Unknown message type: " + 
type);
   }
 }


svn commit: r28885 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_21_10_02-9cb9d72-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-21 Thread pwendell
Author: pwendell
Date: Tue Aug 21 17:15:41 2018
New Revision: 28885

Log:
Apache Spark 2.3.3-SNAPSHOT-2018_08_21_10_02-9cb9d72 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [DOCS][MINOR] Fix a few broken links and typos, and, nit, use HTTPS more consistently

2018-08-21 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master d80063278 -> 35f7f5ce8


[DOCS][MINOR] Fix a few broken links and typos, and, nit, use HTTPS more 
consistently

## What changes were proposed in this pull request?

Fix a few broken links and typos, and, nit, use HTTPS more consistently esp. on 
scripts and Apache links

## How was this patch tested?

Doc build

Closes #22172 from srowen/DocTypo.

Authored-by: Sean Owen 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35f7f5ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35f7f5ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35f7f5ce

Branch: refs/heads/master
Commit: 35f7f5ce83984d8afe0b7955942baa04f2bef74f
Parents: d800632
Author: Sean Owen 
Authored: Wed Aug 22 01:02:17 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Aug 22 01:02:17 2018 +0800

--
 docs/README.md |  4 ++--
 docs/_layouts/404.html |  2 +-
 docs/_layouts/global.html  |  6 +++---
 docs/building-spark.md |  8 
 docs/contributing-to-spark.md  |  2 +-
 docs/index.md  | 16 
 docs/ml-migration-guides.md|  2 +-
 docs/quick-start.md|  2 +-
 docs/rdd-programming-guide.md  |  4 ++--
 docs/running-on-mesos.md   |  2 +-
 docs/running-on-yarn.md|  2 +-
 docs/security.md   |  6 +++---
 docs/sparkr.md |  2 +-
 docs/sql-programming-guide.md  |  6 +++---
 docs/streaming-kinesis-integration.md  |  2 +-
 docs/streaming-programming-guide.md|  5 ++---
 docs/structured-streaming-programming-guide.md |  2 +-
 17 files changed, 36 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/35f7f5ce/docs/README.md
--
diff --git a/docs/README.md b/docs/README.md
index dbea4d6..7da543d 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -2,7 +2,7 @@ Welcome to the Spark documentation!
 
 This readme will walk you through navigating and building the Spark 
documentation, which is included
 here with the Spark source code. You can also find documentation specific to 
release versions of
-Spark at http://spark.apache.org/documentation.html.
+Spark at https://spark.apache.org/documentation.html.
 
 Read on to learn more about viewing documentation in plain text (i.e., 
markdown) or building the
 documentation yourself. Why build it yourself? So that you have the docs that 
correspond to
@@ -79,7 +79,7 @@ jekyll plugin to run `build/sbt unidoc` before building the 
site so if you haven
 may take some time as it generates all of the scaladoc and javadoc using 
[Unidoc](https://github.com/sbt/sbt-unidoc).
 The jekyll plugin also generates the PySpark docs using 
[Sphinx](http://sphinx-doc.org/), SparkR docs
 using [roxygen2](https://cran.r-project.org/web/packages/roxygen2/index.html) 
and SQL docs
-using [MkDocs](http://www.mkdocs.org/).
+using [MkDocs](https://www.mkdocs.org/).
 
 NOTE: To skip the step of building and copying over the Scala, Java, Python, R 
and SQL API docs, run `SKIP_API=1
 jekyll build`. In addition, `SKIP_SCALADOC=1`, `SKIP_PYTHONDOC=1`, 
`SKIP_RDOC=1` and `SKIP_SQLDOC=1` can be used

http://git-wip-us.apache.org/repos/asf/spark/blob/35f7f5ce/docs/_layouts/404.html
--
diff --git a/docs/_layouts/404.html b/docs/_layouts/404.html
index 0446544..78f98b9 100755
--- a/docs/_layouts/404.html
+++ b/docs/_layouts/404.html
@@ -151,7 +151,7 @@
 
 var GOOG_FIXURL_LANG = (navigator.language || 
'').slice(0,2),GOOG_FIXURL_SITE = location.host;
 
-http://linkhelp.clients.google.com/tbproxy/lh/wm/fixurl.js";>
+https://linkhelp.clients.google.com/tbproxy/lh/wm/fixurl.js";>
 
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/35f7f5ce/docs/_layouts/global.html
--
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index e5af5ae..88d549c 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -50,7 +50,7 @@
 
 
 
 
 
@@ -114,8 +114,8 @@
 Hardware Provisioning
 
 Building 
Spark
-http://spark.apache.org/contributing.html;>Contributing to Spark
-

spark git commit: [MINOR] Add .crc files to .gitignore

2018-08-21 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 5059255d9 -> d80063278


[MINOR] Add .crc files to .gitignore

## What changes were proposed in this pull request?

Add .crc files to .gitignore so that we don't add .crc files in state 
checkpoint to git repo which could be added in test resources.
This is based on comments in #21733, 
https://github.com/apache/spark/pull/21733#issuecomment-414578244.

## How was this patch tested?

Add `.1.delta.crc` and `.2.delta.crc` in `/sql/core/src/test/resources`, and confirm git doesn't suggest the files 
to add to stage.

Closes #22170 from HeartSaVioR/add-crc-files-to-gitignore.

Authored-by: Jungtaek Lim 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8006327
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8006327
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8006327

Branch: refs/heads/master
Commit: d80063278debc5529653d184841f50fe98cdad97
Parents: 5059255
Author: Jungtaek Lim 
Authored: Wed Aug 22 01:00:06 2018 +0800
Committer: hyukjinkwon 
Committed: Wed Aug 22 01:00:06 2018 +0800

--
 .gitignore | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d8006327/.gitignore
--
diff --git a/.gitignore b/.gitignore
index e4c44d0..19db7ac 100644
--- a/.gitignore
+++ b/.gitignore
@@ -77,6 +77,7 @@ target/
 unit-tests.log
 work/
 docs/.jekyll-metadata
+*.crc
 
 # For Hive
 TempStatsStore/


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25114][2.3][CORE][FOLLOWUP] Fix RecordBinaryComparatorSuite build failure

2018-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8bde46781 -> 9cb9d7201


[SPARK-25114][2.3][CORE][FOLLOWUP] Fix RecordBinaryComparatorSuite build failure

## What changes were proposed in this pull request?

Fix RecordBinaryComparatorSuite build failure

## How was this patch tested?

Existing tests.

Closes #22166 from jiangxb1987/SPARK-25114-2.3.

Authored-by: Xingbo Jiang 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cb9d720
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cb9d720
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cb9d720

Branch: refs/heads/branch-2.3
Commit: 9cb9d7201acbef6dfc1c9fe0cfd39fd9d89cac76
Parents: 8bde467
Author: Xingbo Jiang 
Authored: Tue Aug 21 09:45:19 2018 -0700
Committer: Xiao Li 
Committed: Tue Aug 21 09:45:19 2018 -0700

--
 .../test/java/org/apache/spark/memory/TestMemoryConsumer.java | 7 +++
 1 file changed, 7 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9cb9d720/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
--
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java 
b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index db91329..1b7739c 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -19,6 +19,8 @@ package org.apache.spark.memory;
 
 import java.io.IOException;
 
+import org.apache.spark.unsafe.memory.MemoryBlock;
+
 public class TestMemoryConsumer extends MemoryConsumer {
   public TestMemoryConsumer(TaskMemoryManager memoryManager, MemoryMode mode) {
 super(memoryManager, 1024L, mode);
@@ -43,6 +45,11 @@ public class TestMemoryConsumer extends MemoryConsumer {
 used -= size;
 taskMemoryManager.releaseExecutionMemory(size, this);
   }
+
+  public void freePage(MemoryBlock page) {
+used -= page.size();
+taskMemoryManager.freePage(page, this);
+  }
 }
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25161][CORE] Fix several bugs in failure handling of barrier execution mode

2018-08-21 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master b8788b3e7 -> 5059255d9


[SPARK-25161][CORE] Fix several bugs in failure handling of barrier execution 
mode

## What changes were proposed in this pull request?

Fix several bugs in failure handling of barrier execution mode:
* Mark TaskSet for a barrier stage as zombie when a task attempt fails;
* Multiple barrier task failures from a single barrier stage should not trigger 
multiple stage retries;
* Barrier task failure from a previous failed stage attempt should not trigger 
stage retry;
* Fail the job when a task from a barrier ResultStage failed;
* RDD.isBarrier() should not rely on `ShuffleDependency`s.

## How was this patch tested?

Added corresponding test cases in `DAGSchedulerSuite` and 
`TaskSchedulerImplSuite`.

Closes #22158 from jiangxb1987/failure.

Authored-by: Xingbo Jiang 
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5059255d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5059255d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5059255d

Branch: refs/heads/master
Commit: 5059255d91fc7a9810e013eba39e12d30291dd08
Parents: b8788b3
Author: Xingbo Jiang 
Authored: Tue Aug 21 08:25:02 2018 -0700
Committer: Xiangrui Meng 
Committed: Tue Aug 21 08:25:02 2018 -0700

--
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   3 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 125 +++
 .../apache/spark/scheduler/TaskSetManager.scala |   4 +
 .../spark/scheduler/DAGSchedulerSuite.scala | 106 
 .../scheduler/TaskSchedulerImplSuite.scala  |  18 +++
 5 files changed, 200 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/rdd/RDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index cbc1143..374b846 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1863,7 +1863,8 @@ abstract class RDD[T: ClassTag](
 
   // From performance concern, cache the value to avoid repeatedly compute 
`isBarrier()` on a long
   // RDD chain.
-  @transient protected lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
+  @transient protected lazy val isBarrier_ : Boolean =
+dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 2b0ca13..6787250 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1478,9 +1478,11 @@ private[spark] class DAGScheduler(
 
mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId)
 
   case failedResultStage: ResultStage =>
-// Mark all the partitions of the result stage to be not 
finished, to ensure retry
-// all the tasks on resubmitted stage attempt.
-failedResultStage.activeJob.map(_.resetAllPartitions())
+// Abort the failed result stage since we may have committed 
output for some
+// partitions.
+val reason = "Could not recover from a failed barrier 
ResultStage. Most recent " +
+  s"failure reason: $failureMessage"
+abortStage(failedResultStage, reason, None)
 }
   }
 
@@ -1553,62 +1555,75 @@ private[spark] class DAGScheduler(
 
 // Always fail the current stage and retry all the tasks when a 
barrier task fail.
 val failedStage = stageIdToStage(task.stageId)
-logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to 
a barrier task " +
-  "failed.")
-val message = s"Stage failed because barrier task $task finished 
unsuccessfully.\n" +
-  failure.toErrorString
-try {
-  // killAllTaskAttempts will fail if a SchedulerBackend does not 
implement killTask.
-  val reason = s"Task $task from barrier stage $failedStage 
(${failedStage.name}) failed."
-  taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, 
reason)
-} catch {
-  case e: UnsupportedOperationException =>
-// Cannot continue with barrier stage if failed 

svn commit: r28882 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_21_08_02-b8788b3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-21 Thread pwendell
Author: pwendell
Date: Tue Aug 21 15:16:49 2018
New Revision: 28882

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_21_08_02-b8788b3 docs


[This commit notification would consist of 1476 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [BUILD] Close stale PRs

2018-08-21 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 4fb96e510 -> b8788b3e7


[BUILD] Close stale PRs

Closes #16411
Closes #21870
Closes #21794
Closes #21610
Closes #21961
Closes #21940
Closes #21870
Closes #22118
Closes #21624
Closes #19528
Closes #18424

Closes #22159 from srowen/Stale.

Authored-by: Sean Owen 
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8788b3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8788b3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8788b3e

Branch: refs/heads/master
Commit: b8788b3e79d0d508e3a910fefd7e9cff4c6d6245
Parents: 4fb96e5
Author: Sean Owen 
Authored: Tue Aug 21 08:18:21 2018 -0500
Committer: Sean Owen 
Committed: Tue Aug 21 08:18:21 2018 -0500

--

--



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28878 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_21_02_02-8bde467-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-21 Thread pwendell
Author: pwendell
Date: Tue Aug 21 09:15:43 2018
New Revision: 28878

Log:
Apache Spark 2.3.3-SNAPSHOT-2018_08_21_02_02-8bde467 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28876 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_21_00_02-4fb96e5-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-21 Thread pwendell
Author: pwendell
Date: Tue Aug 21 07:17:04 2018
New Revision: 28876

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_21_00_02-4fb96e5 docs


[This commit notification would consist of 1476 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two words is divisible by Integer.MAX_VALUE.

2018-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 9702bb637 -> 8bde46781


[SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two 
words is divisible by Integer.MAX_VALUE.

https://github.com/apache/spark/pull/22079#discussion_r209705612 It is possible 
for two objects to be unequal and yet we consider them as equal with this code, 
if the long values are separated by Int.MaxValue.
This PR fixes the issue.

Add new test cases in `RecordBinaryComparatorSuite`.

Closes #22101 from jiangxb1987/fix-rbc.

Authored-by: Xingbo Jiang 
Signed-off-by: Xiao Li 
(cherry picked from commit 4fb96e5105cec4a3eb19a2b7997600b086bac32f)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bde4678
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bde4678
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bde4678

Branch: refs/heads/branch-2.3
Commit: 8bde4678166f5f01837919d4f8d742b89f5e76b8
Parents: 9702bb6
Author: Xingbo Jiang 
Authored: Mon Aug 20 23:13:31 2018 -0700
Committer: Xiao Li 
Committed: Mon Aug 20 23:18:17 2018 -0700

--
 .../sql/execution/RecordBinaryComparator.java   |  26 +-
 .../sort/RecordBinaryComparatorSuite.java   | 322 +++
 2 files changed, 337 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8bde4678/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
index bb77b5b..40c2cc8 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
@@ -22,12 +22,10 @@ import 
org.apache.spark.util.collection.unsafe.sort.RecordComparator;
 
 public final class RecordBinaryComparator extends RecordComparator {
 
-  // TODO(jiangxb) Add test suite for this.
   @Override
   public int compare(
   Object leftObj, long leftOff, int leftLen, Object rightObj, long 
rightOff, int rightLen) {
 int i = 0;
-int res = 0;
 
 // If the arrays have different length, the longer one is larger.
 if (leftLen != rightLen) {
@@ -40,27 +38,33 @@ public final class RecordBinaryComparator extends 
RecordComparator {
 // check if stars align and we can get both offsets to be aligned
 if ((leftOff % 8) == (rightOff % 8)) {
   while ((leftOff + i) % 8 != 0 && i < leftLen) {
-res = (Platform.getByte(leftObj, leftOff + i) & 0xff) -
-(Platform.getByte(rightObj, rightOff + i) & 0xff);
-if (res != 0) return res;
+final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
+final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
+if (v1 != v2) {
+  return v1 > v2 ? 1 : -1;
+}
 i += 1;
   }
 }
 // for architectures that support unaligned accesses, chew it up 8 bytes 
at a time
 if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 
8 == 0))) {
   while (i <= leftLen - 8) {
-res = (int) ((Platform.getLong(leftObj, leftOff + i) -
-Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE);
-if (res != 0) return res;
+final long v1 = Platform.getLong(leftObj, leftOff + i);
+final long v2 = Platform.getLong(rightObj, rightOff + i);
+if (v1 != v2) {
+  return v1 > v2 ? 1 : -1;
+}
 i += 8;
   }
 }
 // this will finish off the unaligned comparisons, or do the entire 
aligned comparison
 // whichever is needed.
 while (i < leftLen) {
-  res = (Platform.getByte(leftObj, leftOff + i) & 0xff) -
-  (Platform.getByte(rightObj, rightOff + i) & 0xff);
-  if (res != 0) return res;
+  final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
+  final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
+  if (v1 != v2) {
+return v1 > v2 ? 1 : -1;
+  }
   i += 1;
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8bde4678/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
--
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
new file mode 100644
index 000..97f3dc5
--- /dev/null
+++ 

spark git commit: [SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two words is divisible by Integer.MAX_VALUE.

2018-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master f984ec75e -> 4fb96e510


[SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two 
words is divisible by Integer.MAX_VALUE.

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/22079#discussion_r209705612 It is possible 
for two objects to be unequal and yet we consider them as equal with this code, 
if the long values are separated by Int.MaxValue.
This PR fixes the issue.

## How was this patch tested?
Add new test cases in `RecordBinaryComparatorSuite`.

Closes #22101 from jiangxb1987/fix-rbc.

Authored-by: Xingbo Jiang 
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fb96e51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fb96e51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fb96e51

Branch: refs/heads/master
Commit: 4fb96e5105cec4a3eb19a2b7997600b086bac32f
Parents: f984ec7
Author: Xingbo Jiang 
Authored: Mon Aug 20 23:13:31 2018 -0700
Committer: Xiao Li 
Committed: Mon Aug 20 23:13:31 2018 -0700

--
 .../sql/execution/RecordBinaryComparator.java   | 26 
 .../sort/RecordBinaryComparatorSuite.java   | 66 
 2 files changed, 81 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4fb96e51/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
index bb77b5b..40c2cc8 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
@@ -22,12 +22,10 @@ import 
org.apache.spark.util.collection.unsafe.sort.RecordComparator;
 
 public final class RecordBinaryComparator extends RecordComparator {
 
-  // TODO(jiangxb) Add test suite for this.
   @Override
   public int compare(
   Object leftObj, long leftOff, int leftLen, Object rightObj, long 
rightOff, int rightLen) {
 int i = 0;
-int res = 0;
 
 // If the arrays have different length, the longer one is larger.
 if (leftLen != rightLen) {
@@ -40,27 +38,33 @@ public final class RecordBinaryComparator extends 
RecordComparator {
 // check if stars align and we can get both offsets to be aligned
 if ((leftOff % 8) == (rightOff % 8)) {
   while ((leftOff + i) % 8 != 0 && i < leftLen) {
-res = (Platform.getByte(leftObj, leftOff + i) & 0xff) -
-(Platform.getByte(rightObj, rightOff + i) & 0xff);
-if (res != 0) return res;
+final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
+final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
+if (v1 != v2) {
+  return v1 > v2 ? 1 : -1;
+}
 i += 1;
   }
 }
 // for architectures that support unaligned accesses, chew it up 8 bytes 
at a time
 if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 
8 == 0))) {
   while (i <= leftLen - 8) {
-res = (int) ((Platform.getLong(leftObj, leftOff + i) -
-Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE);
-if (res != 0) return res;
+final long v1 = Platform.getLong(leftObj, leftOff + i);
+final long v2 = Platform.getLong(rightObj, rightOff + i);
+if (v1 != v2) {
+  return v1 > v2 ? 1 : -1;
+}
 i += 8;
   }
 }
 // this will finish off the unaligned comparisons, or do the entire 
aligned comparison
 // whichever is needed.
 while (i < leftLen) {
-  res = (Platform.getByte(leftObj, leftOff + i) & 0xff) -
-  (Platform.getByte(rightObj, rightOff + i) & 0xff);
-  if (res != 0) return res;
+  final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
+  final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
+  if (v1 != v2) {
+return v1 > v2 ? 1 : -1;
+  }
   i += 1;
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4fb96e51/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
--
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
index a19ddbd..97f3dc5 100644
---