spark git commit: [SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master c09b31eb8 -> 5df99bd36


[SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto 
DataFrameWriter operations

## What changes were proposed in this pull request?

Remove time metrics since it seems no way to measure it in non per-row tracking.

## How was this patch tested?

Existing tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #18558 from viirya/SPARK-20703-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5df99bd3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5df99bd3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5df99bd3

Branch: refs/heads/master
Commit: 5df99bd364561c6f4c02308149ba5eb71f89247e
Parents: c09b31e
Author: Liang-Chi Hsieh 
Authored: Fri Jul 7 13:12:20 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Jul 7 13:12:20 2017 +0800

--
 .../execution/command/DataWritingCommand.scala  | 10 -
 .../datasources/FileFormatWriter.scala  | 22 +++-
 .../sql/hive/execution/SQLMetricsSuite.scala|  3 ---
 3 files changed, 3 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 0c381a2..700f7f8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -30,7 +30,6 @@ trait DataWritingCommand extends RunnableCommand {
   override lazy val metrics: Map[String, SQLMetric] = {
 val sparkContext = SparkContext.getActive.get
 Map(
-  "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time 
(ms)"),
   "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written 
files"),
   "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of 
written output"),
   "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
@@ -47,23 +46,14 @@ trait DataWritingCommand extends RunnableCommand {
 var numFiles = 0
 var totalNumBytes: Long = 0L
 var totalNumOutput: Long = 0L
-var totalWritingTime: Long = 0L
 
 writeSummaries.foreach { summary =>
   numPartitions += summary.updatedPartitions.size
   numFiles += summary.numOutputFile
   totalNumBytes += summary.numOutputBytes
   totalNumOutput += summary.numOutputRows
-  totalWritingTime += summary.totalWritingTime
 }
 
-val avgWritingTime = if (numFiles > 0) {
-  (totalWritingTime / numFiles).toLong
-} else {
-  0L
-}
-
-metrics("avgTime").add(avgWritingTime)
 metrics("numFiles").add(numFiles)
 metrics("numOutputBytes").add(totalNumBytes)
 metrics("numOutputRows").add(totalNumOutput)

http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 6486663..9eb9eae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -275,8 +275,6 @@ object FileFormatWriter extends Logging {
 /**
  * The data structures used to measure metrics during writing.
  */
-protected var totalWritingTime: Long = 0L
-protected var timeOnCurrentFile: Long = 0L
 protected var numOutputRows: Long = 0L
 protected var numOutputBytes: Long = 0L
 
@@ -343,9 +341,7 @@ object FileFormatWriter extends Logging {
 }
 
 val internalRow = iter.next()
-val startTime = System.nanoTime()
 currentWriter.write(internalRow)
-timeOnCurrentFile += (System.nanoTime() - startTime)
 recordsInFile += 1
   }
   releaseResources()
@@ -355,17 +351,13 @@ object FileFormatWriter extends Logging {
 updatedPartitions = Set.empty,
 numOutputFile = fileCounter + 1,
 numOutputBytes = numOutputBytes,
-numOutputRows = numOutputRows,
-totalWritingTime = 

spark git commit: [SPARK-21217][SQL] Support ColumnVector.Array.toArray()

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 53c2eb59b -> c09b31eb8


[SPARK-21217][SQL] Support ColumnVector.Array.toArray()

## What changes were proposed in this pull request?

This PR implements bulk-copy for `ColumnVector.Array.toArray()` methods 
(e.g. `toIntArray()`) in `ColumnVector.Array` by using `System.arrayCopy()` or 
`Platform.copyMemory()`.

Before this PR, when one of these method is called, the generic method in 
`ArrayData` is called. It is not fast since element-wise copy is performed.

This PR can improve performance of a benchmark program by 1.9x and 3.2x.

Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 
4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz

Int ArrayBest/Avg Time(ms)Rate(M/s)   Per 
Row(ns)

ON_HEAP586 /  628 14.3  
69.9
OFF_HEAP   893 /  902  9.4 
106.5
```

With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 
4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz

Int ArrayBest/Avg Time(ms)Rate(M/s)   Per 
Row(ns)

ON_HEAP306 /  331 27.4  
36.4
OFF_HEAP   282 /  287 29.8  
33.6
```

Source program
```
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
  val len = 8 * 1024 * 1024
  val column = ColumnVector.allocate(len * 2, new ArrayType(IntegerType, 
false), memMode)

  val data = column.arrayData
  var i = 0
  while (i < len) {
data.putInt(i, i)
i += 1
  }
  column.putArray(0, 0, len)

  val benchmark = new Benchmark("Int Array", len, minNumIters = 20)
  benchmark.addCase(s"$memMode") { iter =>
var i = 0
while (i < 50) {
  column.getArray(0).toIntArray
  i += 1
}
  }
  benchmark.run
}}
```

## How was this patch tested?

Added test suite

Author: Kazuaki Ishizaki 

Closes #18425 from kiszk/SPARK-21217.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c09b31eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c09b31eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c09b31eb

Branch: refs/heads/master
Commit: c09b31eb8fa83d5463a045c9278f5874ae505a8e
Parents: 53c2eb5
Author: Kazuaki Ishizaki 
Authored: Fri Jul 7 13:09:32 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Jul 7 13:09:32 2017 +0800

--
 .../sql/execution/vectorized/ColumnVector.java  | 56 +++
 .../vectorized/OffHeapColumnVector.java | 58 
 .../vectorized/OnHeapColumnVector.java  | 58 
 .../vectorized/ColumnarBatchSuite.scala | 49 +
 4 files changed, 221 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c09b31eb/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 24260a6..0c027f8 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -100,6 +100,27 @@ public abstract class ColumnVector implements 
AutoCloseable {
   throw new UnsupportedOperationException();
 }
 
+@Override
+public boolean[] toBooleanArray() { return data.getBooleans(offset, 
length); }
+
+@Override
+public byte[] toByteArray() { return data.getBytes(offset, length); }
+
+@Override
+public short[] toShortArray() { return data.getShorts(offset, length); }
+
+@Override
+public int[] toIntArray() { return data.getInts(offset, length); }
+
+@Override
+public long[] toLongArray() { return data.getLongs(offset, length); }
+
+@Override
+public float[] toFloatArray() { return data.getFloats(offset, length); }
+
+@Override
+public double[] toDoubleArray() { return data.getDoubles(offset, length); }
+
 // TODO: this is extremely expensive.
 @Override
 public Object[] array() {
@@ -367,6 +388,11 @@ public abstract class ColumnVector implements 

spark git commit: [SPARK-21327][SQL][PYSPARK] ArrayConstructor should handle an array of typecode 'l' as long rather than int in Python 2.

2017-07-06 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master d451b7f43 -> 53c2eb59b


[SPARK-21327][SQL][PYSPARK] ArrayConstructor should handle an array of typecode 
'l' as long rather than int in Python 2.

## What changes were proposed in this pull request?

Currently `ArrayConstructor` handles an array of typecode `'l'` as `int` when 
converting Python object in Python 2 into Java object, so if the value is 
larger than `Integer.MAX_VALUE` or smaller than `Integer.MIN_VALUE` then the 
overflow occurs.

```python
import array
data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 
9223372036854775807]))]
df = spark.createDataFrame(data)
df.show(truncate=False)
```

```
+--+
|longarray |
+--+
|[0, 0, -1]|
+--+
```

This should be:

```
+--+
|longarray |
+--+
|[-9223372036854775808, 0, 9223372036854775807]|
+--+
```

## How was this patch tested?

Added a test and existing tests.

Author: Takuya UESHIN 

Closes #18553 from ueshin/issues/SPARK-21327.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53c2eb59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53c2eb59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53c2eb59

Branch: refs/heads/master
Commit: 53c2eb59b2cc557081f6a252748dc38511601b0d
Parents: d451b7f
Author: Takuya UESHIN 
Authored: Fri Jul 7 14:05:22 2017 +0900
Committer: Takuya UESHIN 
Committed: Fri Jul 7 14:05:22 2017 +0900

--
 .../scala/org/apache/spark/api/python/SerDeUtil.scala | 10 ++
 python/pyspark/sql/tests.py   |  6 ++
 2 files changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/53c2eb59/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 6e4eab4..42f67e8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -73,6 +73,16 @@ private[spark] object SerDeUtil extends Logging {
 // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate 
correctly
 val data = 
args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1)
 construct(typecode, machineCodes(typecode), data)
+  } else if (args.length == 2 && args(0) == "l") {
+// On Python 2, an array of typecode 'l' should be handled as long 
rather than int.
+val values = args(1).asInstanceOf[JArrayList[_]]
+val result = new Array[Long](values.size)
+var i = 0
+while (i < values.size) {
+  result(i) = values.get(i).asInstanceOf[Number].longValue()
+  i += 1
+}
+result
   } else {
 super.construct(args)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/53c2eb59/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index c0e3b8d..9db2f40 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -2342,6 +2342,12 @@ class SQLTests(ReusedPySparkTestCase):
 self.assertEquals(types[2], np.bool)
 self.assertEquals(types[3], np.float32)
 
+def test_create_dataframe_from_array_of_long(self):
+import array
+data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 
9223372036854775807]))]
+df = self.spark.createDataFrame(data)
+self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 
9223372036854775807]))
+
 
 class HiveSparkSubmitTests(SparkSubmitTests):
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21326][SPARK-21066][ML] Use TextFileFormat in LibSVMFileFormat and allow multiple input paths for determining numFeatures

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master e5bb26174 -> d451b7f43


[SPARK-21326][SPARK-21066][ML] Use TextFileFormat in LibSVMFileFormat and allow 
multiple input paths for determining numFeatures

## What changes were proposed in this pull request?

This is related with 
[SPARK-19918](https://issues.apache.org/jira/browse/SPARK-19918) and 
[SPARK-18362](https://issues.apache.org/jira/browse/SPARK-18362).

This PR proposes to use `TextFileFormat` and allow multiple input paths (but 
with a warning) when determining the number of features in LibSVM data source 
via an extra scan.

There are three points here:

- The main advantage of this change should be to remove file-listing 
bottlenecks in driver side.

- Another advantage is ones from using `FileScanRDD`. For example, I guess we 
can use `spark.sql.files.ignoreCorruptFiles` option when determining the number 
of features.

- We can unify the schema inference code path in text based data sources. This 
is also a preparation for 
[SPARK-21289](https://issues.apache.org/jira/browse/SPARK-21289).

## How was this patch tested?

Unit tests in `LibSVMRelationSuite`.

Closes #18288

Author: hyukjinkwon 

Closes #18556 from HyukjinKwon/libsvm-schema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d451b7f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d451b7f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d451b7f4

Branch: refs/heads/master
Commit: d451b7f43d559aa1efd7ac3d1cbec5249f3a7a24
Parents: e5bb261
Author: hyukjinkwon 
Authored: Fri Jul 7 12:24:03 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Jul 7 12:24:03 2017 +0800

--
 .../spark/ml/source/libsvm/LibSVMRelation.scala | 26 ++--
 .../org/apache/spark/mllib/util/MLUtils.scala   | 25 +--
 .../ml/source/libsvm/LibSVMRelationSuite.scala  | 17 ++---
 3 files changed, 49 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d451b7f4/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index f68847a..dec1183 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.TaskContext
 import org.apache.spark.ml.feature.LabeledPoint
 import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
@@ -66,7 +67,10 @@ private[libsvm] class LibSVMOutputWriter(
 
 /** @see [[LibSVMDataSource]] for public documentation. */
 // If this is moved or renamed, please update DataSource's 
backwardCompatibilityMap.
-private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
+private[libsvm] class LibSVMFileFormat
+  extends TextBasedFileFormat
+  with DataSourceRegister
+  with Logging {
 
   override def shortName(): String = "libsvm"
 
@@ -89,18 +93,14 @@ private[libsvm] class LibSVMFileFormat extends 
TextBasedFileFormat with DataSour
   files: Seq[FileStatus]): Option[StructType] = {
 val libSVMOptions = new LibSVMOptions(options)
 val numFeatures: Int = libSVMOptions.numFeatures.getOrElse {
-  // Infers number of features if the user doesn't specify (a valid) one.
-  val dataFiles = files.filterNot(_.getPath.getName startsWith "_")
-  val path = if (dataFiles.length == 1) {
-dataFiles.head.getPath.toUri.toString
-  } else if (dataFiles.isEmpty) {
-throw new IOException("No input path specified for libsvm data")
-  } else {
-throw new IOException("Multiple input paths are not supported for 
libsvm data.")
-  }
-
-  val sc = sparkSession.sparkContext
-  val parsed = MLUtils.parseLibSVMFile(sc, path, sc.defaultParallelism)
+  require(files.nonEmpty, "No input path specified for libsvm data")
+  logWarning(
+"'numFeatures' option not specified, determining the number of 
features by going " +
+"though the input. If you know the number in advance, please specify 
it via " +
+"'numFeatures' option to avoid the extra scan.")
+
+  val paths = files.map(_.getPath.toUri.toString)
+  val parsed = MLUtils.parseLibSVMFile(sparkSession, paths)
   MLUtils.computeNumFeatures(parsed)
 }
 


spark git commit: [SPARK-21329][SS] Make EventTimeWatermarkExec explicitly UnaryExecNode

2017-07-06 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 40c7add3a -> e5bb26174


[SPARK-21329][SS] Make EventTimeWatermarkExec explicitly UnaryExecNode

## What changes were proposed in this pull request?

Making EventTimeWatermarkExec explicitly UnaryExecNode

/cc tdas zsxwing

## How was this patch tested?

Local build.

Author: Jacek Laskowski 

Closes #18509 from jaceklaskowski/EventTimeWatermarkExec-UnaryExecNode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5bb2617
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5bb2617
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5bb2617

Branch: refs/heads/master
Commit: e5bb26174d3336e07dd670eec4fd2137df346163
Parents: 40c7add
Author: Jacek Laskowski 
Authored: Thu Jul 6 18:11:41 2017 -0700
Committer: Shixiong Zhu 
Committed: Thu Jul 6 18:11:41 2017 -0700

--
 .../spark/sql/execution/streaming/EventTimeWatermarkExec.scala | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e5bb2617/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 25cf609..87e5b78 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.types.MetadataBuilder
 import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.AccumulatorV2
@@ -81,7 +81,7 @@ class EventTimeStatsAccum(protected var currentStats: 
EventTimeStats = EventTime
 case class EventTimeWatermarkExec(
 eventTime: Attribute,
 delay: CalendarInterval,
-child: SparkPlan) extends SparkPlan {
+child: SparkPlan) extends UnaryExecNode {
 
   val eventTimeStats = new EventTimeStatsAccum()
   val delayMs = EventTimeWatermark.getDelayMs(delay)
@@ -117,6 +117,4 @@ case class EventTimeWatermarkExec(
   a
 }
   }
-
-  override def children: Seq[SparkPlan] = child :: Nil
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20946][SQL] Do not update conf for existing SparkContext in SparkSession.getOrCreate

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 0217dfd26 -> 40c7add3a


[SPARK-20946][SQL] Do not update conf for existing SparkContext in 
SparkSession.getOrCreate

## What changes were proposed in this pull request?

SparkContext is shared by all sessions, we should not update its conf for only 
one session.

## How was this patch tested?

existing tests

Author: Wenchen Fan 

Closes #18536 from cloud-fan/config.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40c7add3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40c7add3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40c7add3

Branch: refs/heads/master
Commit: 40c7add3a4c811202d1fa2be9606aca08df81266
Parents: 0217dfd
Author: Wenchen Fan 
Authored: Fri Jul 7 08:44:31 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Jul 7 08:44:31 2017 +0800

--
 .../spark/ml/recommendation/ALSSuite.scala   |  4 +---
 .../apache/spark/ml/tree/impl/TreeTests.scala|  2 --
 .../org/apache/spark/sql/SparkSession.scala  | 19 +++
 .../spark/sql/SparkSessionBuilderSuite.scala |  8 +++-
 4 files changed, 11 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/40c7add3/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 3094f52..b57fc8d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -818,15 +818,13 @@ class ALSCleanerSuite extends SparkFunSuite {
   FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, 
TrueFileFilter.INSTANCE).asScala.toSet
 try {
   conf.set("spark.local.dir", localDir.getAbsolutePath)
-  val sc = new SparkContext("local[2]", "test", conf)
+  val sc = new SparkContext("local[2]", "ALSCleanerSuite", conf)
   try {
 sc.setCheckpointDir(checkpointDir.getAbsolutePath)
 // Generate test data
 val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0)
 // Implicitly test the cleaning of parents during ALS training
 val spark = SparkSession.builder
-  .master("local[2]")
-  .appName("ALSCleanerSuite")
   .sparkContext(sc)
   .getOrCreate()
 import spark.implicits._

http://git-wip-us.apache.org/repos/asf/spark/blob/40c7add3/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala 
b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala
index 92a2369..b6894b3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala
@@ -43,8 +43,6 @@ private[ml] object TreeTests extends SparkFunSuite {
   categoricalFeatures: Map[Int, Int],
   numClasses: Int): DataFrame = {
 val spark = SparkSession.builder()
-  .master("local[2]")
-  .appName("TreeTests")
   .sparkContext(data.sparkContext)
   .getOrCreate()
 import spark.implicits._

http://git-wip-us.apache.org/repos/asf/spark/blob/40c7add3/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 0ddcd21..6dfe8a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -867,7 +867,7 @@ object SparkSession {
  *
  * @since 2.2.0
  */
-def withExtensions(f: SparkSessionExtensions => Unit): Builder = {
+def withExtensions(f: SparkSessionExtensions => Unit): Builder = 
synchronized {
   f(extensions)
   this
 }
@@ -912,21 +912,16 @@ object SparkSession {
 
 // No active nor global default session. Create a new one.
 val sparkContext = userSuppliedContext.getOrElse {
-  // set app name if not given
-  val randomAppName = java.util.UUID.randomUUID().toString
   val sparkConf = new SparkConf()
   options.foreach { case (k, v) => sparkConf.set(k, v) }
+
+  // set a random app name if not given.
   if (!sparkConf.contains("spark.app.name")) {
-

spark git commit: [SPARK-21267][SS][DOCS] Update Structured Streaming Documentation

2017-07-06 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4e53a4edd -> 576fd4c3a


[SPARK-21267][SS][DOCS] Update Structured Streaming Documentation

## What changes were proposed in this pull request?

Few changes to the Structured Streaming documentation
- Clarify that the entire stream input table is not materialized
- Add information for Ganglia
- Add Kafka Sink to the main docs
- Removed a couple of leftover experimental tags
- Added more associated reading material and talk videos.

In addition, https://github.com/apache/spark/pull/16856 broke the link to the 
RDD programming guide in several places while renaming the page. This PR fixes 
those sameeragarwal cloud-fan.
- Added a redirection to avoid breaking internal and possible external links.
- Removed unnecessary redirection pages that were there since the separate 
scala, java, and python programming guides were merged together in 2013 or 2014.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Tathagata Das 

Closes #18485 from tdas/SPARK-21267.

(cherry picked from commit 0217dfd26f89133f146197359b556c9bf5aca172)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/576fd4c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/576fd4c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/576fd4c3

Branch: refs/heads/branch-2.2
Commit: 576fd4c3a67b4affc5ac50979e27ae929472f0d9
Parents: 4e53a4e
Author: Tathagata Das 
Authored: Thu Jul 6 17:28:20 2017 -0700
Committer: Shixiong Zhu 
Committed: Thu Jul 6 17:28:28 2017 -0700

--
 docs/_layouts/global.html   |   7 +-
 docs/index.md   |  13 +-
 docs/java-programming-guide.md  |   7 -
 docs/programming-guide.md   |   7 +
 docs/python-programming-guide.md|   7 -
 docs/rdd-programming-guide.md   |   2 +-
 docs/scala-programming-guide.md |   7 -
 docs/sql-programming-guide.md   |  16 +-
 docs/structured-streaming-programming-guide.md  | 172 ---
 .../scala/org/apache/spark/sql/Dataset.scala|   3 -
 10 files changed, 169 insertions(+), 72 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/576fd4c3/docs/_layouts/global.html
--
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index c00d0db..570483c 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -69,11 +69,10 @@
 Programming Guides
 
 Quick 
Start
-Spark 
Programming Guide
-
-Spark Streaming
-DataFrames, Datasets and SQL
+RDDs, 
Accumulators, Broadcasts Vars
+SQL, 
DataFrames, and Datasets
 Structured Streaming
+Spark Streaming (DStreams)
 MLlib (Machine 
Learning)
 GraphX (Graph Processing)
 SparkR (R on 
Spark)

http://git-wip-us.apache.org/repos/asf/spark/blob/576fd4c3/docs/index.md
--
diff --git a/docs/index.md b/docs/index.md
index a757fa0..51641c9 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -88,13 +88,12 @@ options for deployment:
 **Programming Guides:**
 
 * [Quick Start](quick-start.html): a quick introduction to the Spark API; 
start here!
-* [Spark Programming Guide](programming-guide.html): detailed overview of Spark
-  in all supported languages (Scala, Java, Python, R)
-* Modules built on Spark:
-  * [Spark Streaming](streaming-programming-guide.html): processing real-time 
data streams
-  * [Spark SQL, Datasets, and DataFrames](sql-programming-guide.html): support 
for structured data and relational queries
-  * [MLlib](ml-guide.html): built-in machine learning library
-  * [GraphX](graphx-programming-guide.html): Spark's new API for graph 
processing
+* [RDD Programming Guide](programming-guide.html): overview of Spark basics - 
RDDs (core but old API), accumulators, and broadcast variables  
+* [Spark SQL, Datasets, and 

spark git commit: [SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval

2017-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 48e44b24a -> bf66335ac


[SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval

## What changes were proposed in this pull request?

Rename org.apache.spark.sql.catalyst.plans.logical.statsEstimation.Range to 
ValueInterval.
The current naming is identical to logical operator "range".
Refactoring it to ValueInterval is more accurate.

## How was this patch tested?

unit test

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Wang Gengliang 

Closes #18549 from gengliangwang/ValueInterval.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf66335a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf66335a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf66335a

Branch: refs/heads/master
Commit: bf66335acab3c0c188f6c378eb8aa6948a259cb2
Parents: 48e44b2
Author: Wang Gengliang 
Authored: Thu Jul 6 13:58:27 2017 -0700
Committer: Reynold Xin 
Committed: Thu Jul 6 13:58:27 2017 -0700

--
 .../statsEstimation/FilterEstimation.scala  | 36 
 .../statsEstimation/JoinEstimation.scala| 14 +--
 .../plans/logical/statsEstimation/Range.scala   | 88 ---
 .../logical/statsEstimation/ValueInterval.scala | 91 
 4 files changed, 117 insertions(+), 112 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 5a3bee7..e13db85 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -316,8 +316,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
 // decide if the value is in [min, max] of the column.
 // We currently don't store min/max for binary/string type.
 // Hence, we assume it is in boundary for binary/string type.
-val statsRange = Range(colStat.min, colStat.max, attr.dataType)
-if (statsRange.contains(literal)) {
+val statsInterval = ValueInterval(colStat.min, colStat.max, attr.dataType)
+if (statsInterval.contains(literal)) {
   if (update) {
 // We update ColumnStat structure after apply this equality predicate:
 // Set distinctCount to 1, nullCount to 0, and min/max values (if 
exist) to the literal
@@ -388,9 +388,10 @@ case class FilterEstimation(plan: Filter) extends Logging {
 // use [min, max] to filter the original hSet
 dataType match {
   case _: NumericType | BooleanType | DateType | TimestampType =>
-val statsRange = Range(colStat.min, colStat.max, 
dataType).asInstanceOf[NumericRange]
+val statsInterval =
+  ValueInterval(colStat.min, colStat.max, 
dataType).asInstanceOf[NumericValueInterval]
 val validQuerySet = hSet.filter { v =>
-  v != null && statsRange.contains(Literal(v, dataType))
+  v != null && statsInterval.contains(Literal(v, dataType))
 }
 
 if (validQuerySet.isEmpty) {
@@ -440,12 +441,13 @@ case class FilterEstimation(plan: Filter) extends Logging 
{
   update: Boolean): Option[BigDecimal] = {
 
 val colStat = colStatsMap(attr)
-val statsRange = Range(colStat.min, colStat.max, 
attr.dataType).asInstanceOf[NumericRange]
-val max = statsRange.max.toBigDecimal
-val min = statsRange.min.toBigDecimal
+val statsInterval =
+  ValueInterval(colStat.min, colStat.max, 
attr.dataType).asInstanceOf[NumericValueInterval]
+val max = statsInterval.max.toBigDecimal
+val min = statsInterval.min.toBigDecimal
 val ndv = BigDecimal(colStat.distinctCount)
 
-// determine the overlapping degree between predicate range and column's 
range
+// determine the overlapping degree between predicate interval and 
column's interval
 val numericLiteral = if (literal.dataType == BooleanType) {
   if (literal.value.asInstanceOf[Boolean]) BigDecimal(1) else BigDecimal(0)
 } else {
@@ -566,18 +568,18 @@ case class FilterEstimation(plan: Filter) extends Logging 
{
 }
 
 val colStatLeft = colStatsMap(attrLeft)
-val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
-  .asInstanceOf[NumericRange]
-val maxLeft = 

spark git commit: [SPARK-21204][SQL] Add support for Scala Set collection types in serialization

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 26ac085de -> 48e44b24a


[SPARK-21204][SQL] Add support for Scala Set collection types in serialization

## What changes were proposed in this pull request?

Currently we can't produce a `Dataset` containing `Set` in SparkSQL. This PR 
tries to support serialization/deserialization of `Set`.

Because there's no corresponding internal data type in SparkSQL for a `Set`, 
the most proper choice for serializing a set should be an array.

## How was this patch tested?

Added unit tests.

Author: Liang-Chi Hsieh 

Closes #18416 from viirya/SPARK-21204.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48e44b24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48e44b24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48e44b24

Branch: refs/heads/master
Commit: 48e44b24a7663142176102ac4c6bf4242f103804
Parents: 26ac085
Author: Liang-Chi Hsieh 
Authored: Fri Jul 7 01:07:45 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Jul 7 01:07:45 2017 +0800

--
 .../spark/sql/catalyst/ScalaReflection.scala| 28 --
 .../catalyst/expressions/objects/objects.scala  |  5 ++--
 .../org/apache/spark/sql/SQLImplicits.scala | 10 +++
 .../spark/sql/DataFrameAggregateSuite.scala | 10 +++
 .../spark/sql/DatasetPrimitiveSuite.scala   | 31 
 5 files changed, 79 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/48e44b24/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 814f2c1..4d5401f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -309,7 +309,10 @@ object ScalaReflection extends ScalaReflection {
   Invoke(arrayData, primitiveMethod, arrayCls, returnNullable = false)
 }
 
-  case t if t <:< localTypeOf[Seq[_]] =>
+  // We serialize a `Set` to Catalyst array. When we deserialize a 
Catalyst array
+  // to a `Set`, if there are duplicated elements, the elements will be 
de-duplicated.
+  case t if t <:< localTypeOf[Seq[_]] ||
+  t <:< localTypeOf[scala.collection.Set[_]] =>
 val TypeRef(_, _, Seq(elementType)) = t
 val Schema(dataType, elementNullable) = schemaFor(elementType)
 val className = getClassNameFromType(elementType)
@@ -327,8 +330,10 @@ object ScalaReflection extends ScalaReflection {
 }
 
 val companion = t.normalize.typeSymbol.companionSymbol.typeSignature
-val cls = companion.declaration(newTermName("newBuilder")) match {
-  case NoSymbol => classOf[Seq[_]]
+val cls = companion.member(newTermName("newBuilder")) match {
+  case NoSymbol if t <:< localTypeOf[Seq[_]] => classOf[Seq[_]]
+  case NoSymbol if t <:< localTypeOf[scala.collection.Set[_]] =>
+classOf[scala.collection.Set[_]]
   case _ => mirror.runtimeClass(t.typeSymbol.asClass)
 }
 UnresolvedMapObjects(mapFunction, getPath, Some(cls))
@@ -502,6 +507,19 @@ object ScalaReflection extends ScalaReflection {
   serializerFor(_, valueType, valuePath, seenTypeSet),
   valueNullable = !valueType.typeSymbol.asClass.isPrimitive)
 
+  case t if t <:< localTypeOf[scala.collection.Set[_]] =>
+val TypeRef(_, _, Seq(elementType)) = t
+
+// There's no corresponding Catalyst type for `Set`, we serialize a 
`Set` to Catalyst array.
+// Note that the property of `Set` is only kept when manipulating the 
data as domain object.
+val newInput =
+  Invoke(
+   inputObject,
+   "toSeq",
+   ObjectType(classOf[Seq[_]]))
+
+toCatalystArray(newInput, elementType)
+
   case t if t <:< localTypeOf[String] =>
 StaticInvoke(
   classOf[UTF8String],
@@ -713,6 +731,10 @@ object ScalaReflection extends ScalaReflection {
 val Schema(valueDataType, valueNullable) = schemaFor(valueType)
 Schema(MapType(schemaFor(keyType).dataType,
   valueDataType, valueContainsNull = valueNullable), nullable = true)
+  case t if t <:< localTypeOf[Set[_]] =>
+val TypeRef(_, _, Seq(elementType)) = t
+val Schema(dataType, nullable) = schemaFor(elementType)
+Schema(ArrayType(dataType, containsNull = nullable), nullable = true)
   case t if t <:< localTypeOf[String] => 

spark git commit: [SPARK-21228][SQL] InSet incorrect handling of structs

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 565e7a8d4 -> 26ac085de


[SPARK-21228][SQL] InSet incorrect handling of structs

## What changes were proposed in this pull request?
When data type is struct, InSet now uses TypeUtils.getInterpretedOrdering 
(similar to EqualTo) to build a TreeSet. In other cases it will use a HashSet 
as before (which should be faster). Similarly, In.eval uses Ordering.equiv 
instead of equals.

## How was this patch tested?
New test in SQLQuerySuite.

Author: Bogdan Raducanu 

Closes #18455 from bogdanrdc/SPARK-21228.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26ac085d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26ac085d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26ac085d

Branch: refs/heads/master
Commit: 26ac085debb54d0104762d1cd4187cdf73f301ba
Parents: 565e7a8
Author: Bogdan Raducanu 
Authored: Fri Jul 7 01:04:57 2017 +0800
Committer: Wenchen Fan 
Committed: Fri Jul 7 01:04:57 2017 +0800

--
 .../sql/catalyst/expressions/predicates.scala   | 57 +---
 .../catalyst/expressions/PredicateSuite.scala   | 31 ++-
 .../catalyst/optimizer/OptimizeInSuite.scala|  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala| 22 
 4 files changed, 78 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/26ac085d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index f3fe58c..7bf10f1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.collection.immutable.TreeSet
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
-import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => 
BasePredicate}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode, GenerateSafeProjection, GenerateUnsafeProjection, Predicate => 
BasePredicate}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.TypeUtils
 import org.apache.spark.sql.types._
@@ -175,20 +176,23 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate {
  |[${sub.output.map(_.dataType.catalogString).mkString(", ")}].
""".stripMargin)
   } else {
-TypeCheckResult.TypeCheckSuccess
+TypeUtils.checkForOrderingExpr(value.dataType, s"function 
$prettyName")
   }
 }
   case _ =>
-if (list.exists(l => l.dataType != value.dataType)) {
-  TypeCheckResult.TypeCheckFailure("Arguments must be same type")
+val mismatchOpt = list.find(l => l.dataType != value.dataType)
+if (mismatchOpt.isDefined) {
+  TypeCheckResult.TypeCheckFailure(s"Arguments must be same type but 
were: " +
+s"${value.dataType} != ${mismatchOpt.get.dataType}")
 } else {
-  TypeCheckResult.TypeCheckSuccess
+  TypeUtils.checkForOrderingExpr(value.dataType, s"function 
$prettyName")
 }
 }
   }
 
   override def children: Seq[Expression] = value +: list
   lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal])
+  private lazy val ordering = TypeUtils.getInterpretedOrdering(value.dataType)
 
   override def nullable: Boolean = children.exists(_.nullable)
   override def foldable: Boolean = children.forall(_.foldable)
@@ -203,10 +207,10 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate {
   var hasNull = false
   list.foreach { e =>
 val v = e.eval(input)
-if (v == evaluatedValue) {
-  return true
-} else if (v == null) {
+if (v == null) {
   hasNull = true
+} else if (ordering.equiv(v, evaluatedValue)) {
+  return true
 }
   }
   if (hasNull) {
@@ -265,7 +269,7 @@ case class InSet(child: Expression, hset: Set[Any]) extends 
UnaryExpression with
   override def nullable: Boolean = child.nullable || hasNull
 
   protected override def nullSafeEval(value: Any): Any = {
-if (hset.contains(value)) {
+if (set.contains(value)) {
   true
 

spark git commit: [SPARK-20950][CORE] add a new config to diskWriteBufferSize which is hard coded before

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master d540dfbff -> 565e7a8d4


[SPARK-20950][CORE] add a new config to diskWriteBufferSize which is hard coded 
before

## What changes were proposed in this pull request?

This PR Improvement in two:
1.With spark.shuffle.spill.diskWriteBufferSize configure diskWriteBufferSize of 
ShuffleExternalSorter.
when change the size of the diskWriteBufferSize to test `forceSorterToSpill`
The average performance of running 10 times is as follows:(their unit is 
MS).
```
diskWriteBufferSize:   1M512K256K128K64K32K16K
8K4K
---
RecordSize = 2.5M  742   722 694 686 667668671
669   683
RecordSize = 1M294   293 292 287 283285281
279   285
```

2.Remove outputBufferSizeInBytes and inputBufferSizeInBytes to initialize in 
mergeSpillsWithFileStream function.

## How was this patch tested?
The unit test.

Author: caoxuewen 

Closes #18174 from heary-cao/buffersize.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/565e7a8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/565e7a8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/565e7a8d

Branch: refs/heads/master
Commit: 565e7a8d4ae7879ee704fb94ae9b3da31e202d7e
Parents: d540dfb
Author: caoxuewen 
Authored: Thu Jul 6 19:49:34 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 19:49:34 2017 +0800

--
 .../shuffle/sort/ShuffleExternalSorter.java | 11 +---
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 14 +++---
 .../unsafe/sort/UnsafeSorterSpillWriter.java| 24 ++---
 .../apache/spark/internal/config/package.scala  | 27 
 4 files changed, 60 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index c33d1e3..338faaa 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -43,6 +43,7 @@ import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.Utils;
+import org.apache.spark.internal.config.package$;
 
 /**
  * An external sorter that is specialized for sort-based shuffle.
@@ -82,6 +83,9 @@ final class ShuffleExternalSorter extends MemoryConsumer {
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
   private final int fileBufferSizeBytes;
 
+  /** The buffer size to use when writing the sorted records to an on-disk 
file */
+  private final int diskWriteBufferSize;
+
   /**
* Memory pages that hold the records being sorted. The pages in this list 
are freed when
* spilling, although in principle we could recycle these pages across 
spills (on the other hand,
@@ -116,13 +120,14 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 this.taskContext = taskContext;
 this.numPartitions = numPartitions;
 // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
-this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+this.fileBufferSizeBytes = (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
 this.numElementsForSpillThreshold =
   conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 
* 1024 * 1024);
 this.writeMetrics = writeMetrics;
 this.inMemSorter = new ShuffleInMemorySorter(
   this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", 
true));
 this.peakMemoryUsedBytes = getMemoryUsage();
+this.diskWriteBufferSize = (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
   }
 
   /**
@@ -155,7 +160,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 // be an API to directly transfer bytes from managed memory to the disk 
writer, we buffer
 // data through a byte array. This array does not need to be large enough 
to hold a single
 // record;
-final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+final byte[] writeBuffer = new byte[diskWriteBufferSize];
 
 // Because this output will be read during shuffle, its compression codec 

spark git commit: [SPARK-21273][SQL][FOLLOW-UP] Add missing test cases back and revise code style

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master b8e4d567a -> d540dfbff


[SPARK-21273][SQL][FOLLOW-UP] Add missing test cases back and revise code style

## What changes were proposed in this pull request?

Add missing test cases back and revise code style

Follow up the previous PR: https://github.com/apache/spark/pull/18479

## How was this patch tested?

Unit test

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Wang Gengliang 

Closes #18548 from gengliangwang/stat_propagation_revise.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d540dfbf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d540dfbf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d540dfbf

Branch: refs/heads/master
Commit: d540dfbff33aa2f8571e0de149dfa3f4e7321113
Parents: b8e4d56
Author: Wang Gengliang 
Authored: Thu Jul 6 19:12:15 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 19:12:15 2017 +0800

--
 .../plans/logical/LogicalPlanVisitor.scala  |  2 +-
 .../BasicStatsEstimationSuite.scala | 45 
 2 files changed, 46 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d540dfbf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
index b230458..2652f6d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
@@ -38,10 +38,10 @@ trait LogicalPlanVisitor[T] {
 case p: Range => visitRange(p)
 case p: Repartition => visitRepartition(p)
 case p: RepartitionByExpression => visitRepartitionByExpr(p)
+case p: ResolvedHint => visitHint(p)
 case p: Sample => visitSample(p)
 case p: ScriptTransformation => visitScriptTransform(p)
 case p: Union => visitUnion(p)
-case p: ResolvedHint => visitHint(p)
 case p: LogicalPlan => default(p)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d540dfbf/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 5fd21a0..913be6d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -78,6 +78,37 @@ class BasicStatsEstimationSuite extends PlanTest with 
StatsEstimationTestBase {
 checkStats(globalLimit, stats)
   }
 
+  test("sample estimation") {
+val sample = Sample(0.0, 0.5, withReplacement = false, (math.random * 
1000).toLong, plan)
+checkStats(sample, Statistics(sizeInBytes = 60, rowCount = Some(5)))
+
+// Child doesn't have rowCount in stats
+val childStats = Statistics(sizeInBytes = 120)
+val childPlan = DummyLogicalPlan(childStats, childStats)
+val sample2 =
+  Sample(0.0, 0.11, withReplacement = false, (math.random * 1000).toLong, 
childPlan)
+checkStats(sample2, Statistics(sizeInBytes = 14))
+  }
+
+  test("estimate statistics when the conf changes") {
+val expectedDefaultStats =
+  Statistics(
+sizeInBytes = 40,
+rowCount = Some(10),
+attributeStats = AttributeMap(Seq(
+  AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), 
Some(10), 0, 4, 4
+val expectedCboStats =
+  Statistics(
+sizeInBytes = 4,
+rowCount = Some(1),
+attributeStats = AttributeMap(Seq(
+  AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), 
Some(5), 0, 4, 4
+
+val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats 
= expectedCboStats)
+checkStats(
+  plan, expectedStatsCboOn = expectedCboStats, expectedStatsCboOff = 
expectedDefaultStats)
+  }
+
   /** Check estimated stats when cbo is turned on/off. */
   private def checkStats(
   plan: LogicalPlan,
@@ -99,3 +130,17 @@ class BasicStatsEstimationSuite extends PlanTest with 
StatsEstimationTestBase {
   private def checkStats(plan: LogicalPlan, 

spark-website git commit: Update Sandy.

2017-07-06 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 8a9ae6b7d -> 878dcfd84


Update Sandy.


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/878dcfd8
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/878dcfd8
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/878dcfd8

Branch: refs/heads/asf-site
Commit: 878dcfd8422f92842a016933d7ce2c8dfb25f46e
Parents: 8a9ae6b
Author: Dongjoon Hyun 
Authored: Wed Jul 5 00:08:01 2017 -0700
Committer: Dongjoon Hyun 
Committed: Wed Jul 5 00:08:01 2017 -0700

--
 committers.md| 2 +-
 site/committers.html | 8 
 2 files changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/878dcfd8/committers.md
--
diff --git a/committers.md b/committers.md
index c7bcca1..e850f8b 100644
--- a/committers.md
+++ b/committers.md
@@ -46,7 +46,7 @@ navigation:
 |Imran Rashid|Cloudera|
 |Charles Reiss|UC Berkeley|
 |Josh Rosen|Databricks|
-|Sandy Ryza|Clover Health|
+|Sandy Ryza|Remix|
 |Kousuke Saruta|NTT Data|
 |Prashant Sharma|IBM|
 |Ram Sriharsha|Databricks|

http://git-wip-us.apache.org/repos/asf/spark-website/blob/878dcfd8/site/committers.html
--
diff --git a/site/committers.html b/site/committers.html
index 2999465..4136677 100644
--- a/site/committers.html
+++ b/site/committers.html
@@ -214,7 +214,7 @@
 
 
   Felix Cheung
-  Automattic
+  Microsoft
 
 
   Mosharaf Chowdhury
@@ -294,7 +294,7 @@
 
 
   Xiao Li
-  IBM
+  Databricks
 
 
   Davies Liu
@@ -350,7 +350,7 @@
 
 
   Sandy Ryza
-  Clover Health
+  Remix
 
 
   Kousuke Saruta
@@ -370,7 +370,7 @@
 
 
   Takuya Ueshin
-  
+  Databricks
 
 
   Marcelo Vanzin


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark-website git commit: Update committer page.

2017-07-06 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 9749c8e2f -> 8a9ae6b7d


Update committer page.


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/8a9ae6b7
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/8a9ae6b7
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/8a9ae6b7

Branch: refs/heads/asf-site
Commit: 8a9ae6b7d17d3a8e8879a37f8fca1fe9a28edc12
Parents: 9749c8e
Author: Dongjoon Hyun 
Authored: Tue Jul 4 23:44:01 2017 -0700
Committer: Dongjoon Hyun 
Committed: Tue Jul 4 23:44:01 2017 -0700

--
 committers.md | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/8a9ae6b7/committers.md
--
diff --git a/committers.md b/committers.md
index a97bb72..c7bcca1 100644
--- a/committers.md
+++ b/committers.md
@@ -12,7 +12,7 @@ navigation:
 |||
 |Michael Armbrust|Databricks|
 |Joseph Bradley|Databricks|
-|Felix Cheung|Automattic|
+|Felix Cheung|Microsoft|
 |Mosharaf Chowdhury|University of Michigan, Ann Arbor|
 |Jason Dai|Intel|
 |Tathagata Das|Databricks|
@@ -32,7 +32,7 @@ navigation:
 |Andy Konwinski|Databricks|
 |Ryan LeCompte|Quantifind|
 |Haoyuan Li|Alluxio, UC Berkeley|
-|Xiao Li|IBM|
+|Xiao Li|Databricks|
 |Davies Liu|Databricks|
 |Cheng Lian|Databricks|
 |Yanbo Liang|Hortonworks|
@@ -51,7 +51,7 @@ navigation:
 |Prashant Sharma|IBM|
 |Ram Sriharsha|Databricks|
 |DB Tsai|Netflix|
-|Takuya Ueshin||
+|Takuya Ueshin|Databricks|
 |Marcelo Vanzin|Cloudera|
 |Shivaram Venkataraman|UC Berkeley|
 |Patrick Wendell|Databricks|


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21324][TEST] Improve statistics test suites

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 6ff05a66f -> b8e4d567a


[SPARK-21324][TEST] Improve statistics test suites

## What changes were proposed in this pull request?

1. move `StatisticsCollectionTestBase` to a separate file.
2. move some test cases to `StatisticsCollectionSuite` so that 
`hive/StatisticsSuite` only keeps tests that need hive support.
3. clear up some test cases.

## How was this patch tested?

Existing tests.

Author: wangzhenhua 
Author: Zhenhua Wang 

Closes #18545 from wzhfy/cleanStatSuites.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8e4d567
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8e4d567
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8e4d567

Branch: refs/heads/master
Commit: b8e4d567a7d6c2ff277700d4e7707e57e87c7808
Parents: 6ff05a66
Author: wangzhenhua 
Authored: Thu Jul 6 16:00:31 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 16:00:31 2017 +0800

--
 .../spark/sql/StatisticsCollectionSuite.scala   | 193 ---
 .../sql/StatisticsCollectionTestBase.scala  | 192 ++
 .../apache/spark/sql/hive/StatisticsSuite.scala | 124 
 3 files changed, 258 insertions(+), 251 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b8e4d567/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index d9392de..843ced7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -17,19 +17,12 @@
 
 package org.apache.spark.sql
 
-import java.{lang => jl}
-import java.sql.{Date, Timestamp}
-
 import scala.collection.mutable
-import scala.util.Random
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogStatistics}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData.ArrayData
 import org.apache.spark.sql.types._
 
@@ -58,6 +51,37 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
 }
   }
 
+  test("analyzing views is not supported") {
+def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
+  val err = intercept[AnalysisException] {
+sql(analyzeCommand)
+  }
+  assert(err.message.contains("ANALYZE TABLE is not supported"))
+}
+
+val tableName = "tbl"
+withTable(tableName) {
+  spark.range(10).write.saveAsTable(tableName)
+  val viewName = "view"
+  withView(viewName) {
+sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName")
+assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
+assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS 
FOR COLUMNS id")
+  }
+}
+  }
+
+  test("statistics collection of a table with zero column") {
+val table_no_cols = "table_no_cols"
+withTable(table_no_cols) {
+  val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
+  val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
+  dfNoCols.write.format("json").saveAsTable(table_no_cols)
+  sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
+  checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts 
= Some(10))
+}
+  }
+
   test("analyze column command - unsupported types and invalid columns") {
 val tableName = "column_stats_test1"
 withTable(tableName) {
@@ -239,154 +263,3 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 
 }
-
-
-/**
- * The base for test cases that we want to include in both the hive module 
(for verifying behavior
- * when using the Hive external catalog) as well as in the sql/core module.
- */
-abstract class StatisticsCollectionTestBase extends QueryTest with 
SQLTestUtils {
-  import testImplicits._
-
-  private val dec1 = new java.math.BigDecimal("1.00")
-  private val dec2 = new java.math.BigDecimal("8.00")
-  private val d1 = 

spark git commit: [SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter operations

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 5800144a5 -> 6ff05a66f


[SPARK-20703][SQL] Associate metrics with data writes onto DataFrameWriter 
operations

## What changes were proposed in this pull request?

Right now in the UI, after SPARK-20213, we can show the operations to write 
data out. However, there is no way to associate metrics with data writes. We 
should show relative metrics on the operations.

 Supported commands

This change supports updating metrics for file-based data writing operations, 
including `InsertIntoHadoopFsRelationCommand`, `InsertIntoHiveTable`.

Supported metrics:

* number of written files
* number of dynamic partitions
* total bytes of written data
* total number of output rows
* average writing data out time (ms)
* (TODO) min/med/max number of output rows per file/partition
* (TODO) min/med/max bytes of written data per file/partition

  Commands not supported

`InsertIntoDataSourceCommand`, `SaveIntoDataSourceCommand`:

The two commands uses DataSource APIs to write data out, i.e., the logic of 
writing data out is delegated to the DataSource implementations, such as  
`InsertableRelation.insert` and `CreatableRelationProvider.createRelation`. So 
we can't obtain metrics from delegated methods for now.

`CreateHiveTableAsSelectCommand`, `CreateDataSourceTableAsSelectCommand` :

The two commands invokes other commands to write data out. The invoked commands 
can even write to non file-based data source. We leave them as future TODO.

 How to update metrics of writing files out

A `RunnableCommand` which wants to update metrics, needs to override its 
`metrics` and provide the metrics data structure to `ExecutedCommandExec`.

The metrics are prepared during the execution of `FileFormatWriter`. The 
callback function passed to `FileFormatWriter` will accept the metrics and 
update accordingly.

There is a metrics updating function in `RunnableCommand`. In runtime, the 
function will be bound to the spark context and `metrics` of 
`ExecutedCommandExec` and pass to `FileFormatWriter`.

## How was this patch tested?

Updated unit tests.

Author: Liang-Chi Hsieh 

Closes #18159 from viirya/SPARK-20703-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ff05a66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ff05a66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ff05a66

Branch: refs/heads/master
Commit: 6ff05a66fe83e721063efe5c28d2ffeb850fecc7
Parents: 5800144
Author: Liang-Chi Hsieh 
Authored: Thu Jul 6 15:47:09 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 15:47:09 2017 +0800

--
 .../scala/org/apache/spark/util/Utils.scala |   9 ++
 .../execution/command/DataWritingCommand.scala  |  75 ++
 .../spark/sql/execution/command/commands.scala  |  12 ++
 .../datasources/FileFormatWriter.scala  | 121 +---
 .../InsertIntoHadoopFsRelationCommand.scala |  18 ++-
 .../sql/sources/PartitionedWriteSuite.scala |  21 +--
 .../hive/execution/InsertIntoHiveTable.scala|   8 +-
 .../sql/hive/execution/SQLMetricsSuite.scala| 139 +++
 8 files changed, 362 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 26f61e2..b4caf68 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1003,6 +1003,15 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Lists files recursively.
+   */
+  def recursiveList(f: File): Array[File] = {
+require(f.isDirectory)
+val current = f.listFiles
+current ++ current.filter(_.isDirectory).flatMap(recursiveList)
+  }
+
+  /**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
* Throws an exception if deletion is unsuccessful.

http://git-wip-us.apache.org/repos/asf/spark/blob/6ff05a66/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
new file mode 100644
index 000..0c381a2
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache 

spark git commit: [SPARK-21012][SUBMIT] Add glob support for resources adding to Spark

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 60043f224 -> 5800144a5


[SPARK-21012][SUBMIT] Add glob support for resources adding to Spark

Current "--jars (spark.jars)", "--files (spark.files)", "--py-files 
(spark.submit.pyFiles)" and "--archives (spark.yarn.dist.archives)" only 
support non-glob path. This is OK for most of the cases, but when user requires 
to add more jars, files into Spark, it is too verbose to list one by one. So 
here propose to add glob path support for resources.

Also improving the code of downloading resources.

## How was this patch tested?

UT added, also verified manually in local cluster.

Author: jerryshao 

Closes #18235 from jerryshao/SPARK-21012.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5800144a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5800144a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5800144a

Branch: refs/heads/master
Commit: 5800144a54f5c0180ccf67392f32c3e8a51119b1
Parents: 60043f2
Author: jerryshao 
Authored: Thu Jul 6 15:32:49 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 15:32:49 2017 +0800

--
 .../org/apache/spark/deploy/SparkSubmit.scala   | 166 +++
 .../spark/deploy/SparkSubmitArguments.scala |   2 +-
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  68 +++-
 docs/configuration.md   |   6 +-
 4 files changed, 196 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5800144a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index d13fb41..abde040 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -17,17 +17,21 @@
 
 package org.apache.spark.deploy
 
-import java.io.{File, IOException}
+import java.io._
 import java.lang.reflect.{InvocationTargetException, Modifier, 
UndeclaredThrowableException}
 import java.net.URL
 import java.nio.file.Files
-import java.security.PrivilegedExceptionAction
+import java.security.{KeyStore, PrivilegedExceptionAction}
+import java.security.cert.X509Certificate
 import java.text.ParseException
+import javax.net.ssl._
 
 import scala.annotation.tailrec
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 import scala.util.Properties
 
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.FileUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -310,33 +314,33 @@ object SparkSubmit extends CommandLineUtils {
   RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
 }
 
-// In client mode, download remote files.
-if (deployMode == CLIENT) {
-  val hadoopConf = new HadoopConfiguration()
-  args.primaryResource = Option(args.primaryResource).map(downloadFile(_, 
hadoopConf)).orNull
-  args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull
-  args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, 
hadoopConf)).orNull
-  args.files = Option(args.files).map(downloadFileList(_, 
hadoopConf)).orNull
-}
-
-// Require all python files to be local, so we can add them to the 
PYTHONPATH
-// In YARN cluster mode, python files are distributed as regular files, 
which can be non-local.
-// In Mesos cluster mode, non-local python files are automatically 
downloaded by Mesos.
-if (args.isPython && !isYarnCluster && !isMesosCluster) {
-  if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
-printErrorAndExit(s"Only local python files are supported: 
${args.primaryResource}")
+val hadoopConf = new HadoopConfiguration()
+val targetDir = Files.createTempDirectory("tmp").toFile
+// scalastyle:off runtimeaddshutdownhook
+Runtime.getRuntime.addShutdownHook(new Thread() {
+  override def run(): Unit = {
+FileUtils.deleteQuietly(targetDir)
   }
-  val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
-  if (nonLocalPyFiles.nonEmpty) {
-printErrorAndExit(s"Only local additional python files are supported: 
$nonLocalPyFiles")
-  }
-}
+})
+// scalastyle:on runtimeaddshutdownhook
 
-// Require all R files to be local
-if (args.isR && !isYarnCluster && !isMesosCluster) {
-  if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
-printErrorAndExit(s"Only local R files are 

spark git commit: [SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir should be deleted

2017-07-06 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 6e1081cbe -> 4e53a4edd


[SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir 
should be deleted

## What changes were proposed in this pull request?

Stopping query while it is being initialized can throw interrupt exception, in 
which case temporary checkpoint directories will not be deleted, and the test 
will fail.

Author: Tathagata Das 

Closes #18442 from tdas/DatastreamReaderWriterSuite-fix.

(cherry picked from commit 60043f22458668ac7ecba94fa78953f23a6bdcec)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e53a4ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e53a4ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e53a4ed

Branch: refs/heads/branch-2.2
Commit: 4e53a4edd72e372583f243c660bbcc0572205716
Parents: 6e1081c
Author: Tathagata Das 
Authored: Thu Jul 6 00:20:26 2017 -0700
Committer: Tathagata Das 
Committed: Thu Jul 6 00:20:40 2017 -0700

--
 .../spark/sql/streaming/test/DataStreamReaderWriterSuite.scala  | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4e53a4ed/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index dc2506a..bae9d81 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -641,6 +641,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   test("temp checkpoint dir should be deleted if a query is stopped without 
errors") {
 import testImplicits._
 val query = MemoryStream[Int].toDS.writeStream.format("console").start()
+query.processAllAvailable()
 val checkpointDir = new Path(
   query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
 val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir should be deleted

2017-07-06 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 14a3bb3a0 -> 60043f224


[SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir 
should be deleted

## What changes were proposed in this pull request?

Stopping query while it is being initialized can throw interrupt exception, in 
which case temporary checkpoint directories will not be deleted, and the test 
will fail.

Author: Tathagata Das 

Closes #18442 from tdas/DatastreamReaderWriterSuite-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60043f22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60043f22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60043f22

Branch: refs/heads/master
Commit: 60043f22458668ac7ecba94fa78953f23a6bdcec
Parents: 14a3bb3
Author: Tathagata Das 
Authored: Thu Jul 6 00:20:26 2017 -0700
Committer: Tathagata Das 
Committed: Thu Jul 6 00:20:26 2017 -0700

--
 .../spark/sql/streaming/test/DataStreamReaderWriterSuite.scala  | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/60043f22/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 3de0ae6..e8a6202 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -641,6 +641,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   test("temp checkpoint dir should be deleted if a query is stopped without 
errors") {
 import testImplicits._
 val query = MemoryStream[Int].toDS.writeStream.format("console").start()
+query.processAllAvailable()
 val checkpointDir = new Path(
   
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
 val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 8f1ca6957 -> 7f7b63bb6


[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

## What changes were proposed in this pull request?

Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures 
include writes to some DataSources that have own SparkPlan implementations and 
cause EXCHANGE in writes.

## How was this patch tested?

Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array 
having non-zero offset.

Author: Sumedh Wale 

Closes #18535 from sumwale/SPARK-21312.

(cherry picked from commit 14a3bb3a008c302aac908d7deaf0942a98c63be7)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f7b63bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f7b63bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f7b63bb

Branch: refs/heads/branch-2.1
Commit: 7f7b63bb634c3b89db80cee99848ee94f9dca6ba
Parents: 8f1ca69
Author: Sumedh Wale 
Authored: Thu Jul 6 14:47:22 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 14:48:12 2017 +0800

--
 .../spark/sql/catalyst/expressions/UnsafeRow.java  |  2 +-
 .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f7b63bb/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index d205547..b8e9388 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements 
Externalizable, Kryo
*/
   public void writeToStream(OutputStream out, byte[] writeBuffer) throws 
IOException {
 if (baseObject instanceof byte[]) {
-  int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
+  int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
   out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
 } else {
   int dataRemaining = sizeInBytes;

http://git-wip-us.apache.org/repos/asf/spark/blob/7f7b63bb/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index a32763d..a5f904c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
 MemoryAllocator.UNSAFE.free(offheapRowPage)
   }
 }
+val (bytesFromArrayBackedRowWithOffset, 
field0StringFromArrayBackedRowWithOffset) = {
+  val baos = new ByteArrayOutputStream()
+  val numBytes = arrayBackedUnsafeRow.getSizeInBytes
+  val bytesWithOffset = new Array[Byte](numBytes + 100)
+  
System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 
0,
+bytesWithOffset, 100, numBytes)
+  val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
+  arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 
100, numBytes)
+  arrayBackedRow.writeToStream(baos, null)
+  (baos.toByteArray, arrayBackedRow.getString(0))
+}
 
 assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
 assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
+assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
+assert(field0StringFromArrayBackedRow === 
field0StringFromArrayBackedRowWithOffset)
   }
 
   test("calling getDouble() and getFloat() on null columns") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 770fd2a23 -> 6e1081cbe


[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

## What changes were proposed in this pull request?

Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures 
include writes to some DataSources that have own SparkPlan implementations and 
cause EXCHANGE in writes.

## How was this patch tested?

Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array 
having non-zero offset.

Author: Sumedh Wale 

Closes #18535 from sumwale/SPARK-21312.

(cherry picked from commit 14a3bb3a008c302aac908d7deaf0942a98c63be7)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e1081cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e1081cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e1081cb

Branch: refs/heads/branch-2.2
Commit: 6e1081cbeac58826526b6ff7f2938a556b31ca9e
Parents: 770fd2a
Author: Sumedh Wale 
Authored: Thu Jul 6 14:47:22 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 14:47:43 2017 +0800

--
 .../spark/sql/catalyst/expressions/UnsafeRow.java  |  2 +-
 .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e1081cb/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 86de909..56994fa 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements 
Externalizable, Kryo
*/
   public void writeToStream(OutputStream out, byte[] writeBuffer) throws 
IOException {
 if (baseObject instanceof byte[]) {
-  int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
+  int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
   out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
 } else {
   int dataRemaining = sizeInBytes;

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1081cb/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index a32763d..a5f904c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
 MemoryAllocator.UNSAFE.free(offheapRowPage)
   }
 }
+val (bytesFromArrayBackedRowWithOffset, 
field0StringFromArrayBackedRowWithOffset) = {
+  val baos = new ByteArrayOutputStream()
+  val numBytes = arrayBackedUnsafeRow.getSizeInBytes
+  val bytesWithOffset = new Array[Byte](numBytes + 100)
+  
System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 
0,
+bytesWithOffset, 100, numBytes)
+  val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
+  arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 
100, numBytes)
+  arrayBackedRow.writeToStream(baos, null)
+  (baos.toByteArray, arrayBackedRow.getString(0))
+}
 
 assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
 assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
+assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
+assert(field0StringFromArrayBackedRow === 
field0StringFromArrayBackedRowWithOffset)
   }
 
   test("calling getDouble() and getFloat() on null columns") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 75b168fd3 -> 14a3bb3a0


[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

## What changes were proposed in this pull request?

Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures 
include writes to some DataSources that have own SparkPlan implementations and 
cause EXCHANGE in writes.

## How was this patch tested?

Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array 
having non-zero offset.

Author: Sumedh Wale 

Closes #18535 from sumwale/SPARK-21312.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14a3bb3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14a3bb3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14a3bb3a

Branch: refs/heads/master
Commit: 14a3bb3a008c302aac908d7deaf0942a98c63be7
Parents: 75b168f
Author: Sumedh Wale 
Authored: Thu Jul 6 14:47:22 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 14:47:22 2017 +0800

--
 .../spark/sql/catalyst/expressions/UnsafeRow.java  |  2 +-
 .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 86de909..56994fa 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements 
Externalizable, Kryo
*/
   public void writeToStream(OutputStream out, byte[] writeBuffer) throws 
IOException {
 if (baseObject instanceof byte[]) {
-  int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
+  int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
   out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
 } else {
   int dataRemaining = sizeInBytes;

http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index a32763d..a5f904c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
 MemoryAllocator.UNSAFE.free(offheapRowPage)
   }
 }
+val (bytesFromArrayBackedRowWithOffset, 
field0StringFromArrayBackedRowWithOffset) = {
+  val baos = new ByteArrayOutputStream()
+  val numBytes = arrayBackedUnsafeRow.getSizeInBytes
+  val bytesWithOffset = new Array[Byte](numBytes + 100)
+  
System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 
0,
+bytesWithOffset, 100, numBytes)
+  val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
+  arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 
100, numBytes)
+  arrayBackedRow.writeToStream(baos, null)
+  (baos.toByteArray, arrayBackedRow.getString(0))
+}
 
 assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
 assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
+assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
+assert(field0StringFromArrayBackedRow === 
field0StringFromArrayBackedRowWithOffset)
   }
 
   test("calling getDouble() and getFloat() on null columns") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21308][SQL] Remove SQLConf parameters from the optimizer

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master ab866f117 -> 75b168fd3


[SPARK-21308][SQL] Remove SQLConf parameters from the optimizer

### What changes were proposed in this pull request?
This PR removes SQLConf parameters from the optimizer rules

### How was this patch tested?
The existing test cases

Author: gatorsmile 

Closes #18533 from gatorsmile/rmSQLConfOptimizer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75b168fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75b168fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75b168fd

Branch: refs/heads/master
Commit: 75b168fd30bb9a52ae223b6f1df73da4b1316f2e
Parents: ab866f1
Author: gatorsmile 
Authored: Thu Jul 6 14:18:50 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 14:18:50 2017 +0800

--
 .../optimizer/CostBasedJoinReorder.scala|  7 ++--
 .../sql/catalyst/optimizer/Optimizer.scala  | 36 +---
 .../optimizer/StarSchemaDetection.scala |  4 ++-
 .../sql/catalyst/optimizer/expressions.scala| 14 
 .../spark/sql/catalyst/optimizer/joins.scala|  6 ++--
 .../BinaryComparisonSimplificationSuite.scala   |  2 +-
 .../optimizer/BooleanSimplificationSuite.scala  |  2 +-
 .../optimizer/CombiningLimitsSuite.scala|  2 +-
 .../optimizer/ConstantFoldingSuite.scala|  2 +-
 .../optimizer/DecimalAggregatesSuite.scala  |  2 +-
 .../optimizer/EliminateMapObjectsSuite.scala|  2 +-
 .../optimizer/JoinOptimizationSuite.scala   |  2 +-
 .../catalyst/optimizer/JoinReorderSuite.scala   | 27 ---
 .../catalyst/optimizer/LimitPushdownSuite.scala |  2 +-
 .../optimizer/OptimizeCodegenSuite.scala|  2 +-
 .../catalyst/optimizer/OptimizeInSuite.scala| 24 +++--
 .../StarJoinCostBasedReorderSuite.scala | 36 +++-
 .../optimizer/StarJoinReorderSuite.scala| 25 +++---
 .../catalyst/optimizer/complexTypesSuite.scala  |  2 +-
 .../spark/sql/catalyst/plans/PlanTest.scala |  4 +--
 .../execution/OptimizeMetadataOnlyQuery.scala   |  8 ++---
 .../spark/sql/execution/SparkOptimizer.scala|  6 ++--
 .../sql/internal/BaseSessionStateBuilder.scala  |  2 +-
 23 files changed, 137 insertions(+), 82 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/75b168fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index 3a7543e..db7baf6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -32,7 +32,10 @@ import org.apache.spark.sql.internal.SQLConf
  * We may have several join reorder algorithms in the future. This class is 
the entry of these
  * algorithms, and chooses which one to use.
  */
-case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with 
PredicateHelper {
+object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper {
+
+  private def conf = SQLConf.get
+
   def apply(plan: LogicalPlan): LogicalPlan = {
 if (!conf.cboEnabled || !conf.joinReorderEnabled) {
   plan
@@ -379,7 +382,7 @@ object JoinReorderDPFilters extends PredicateHelper {
 
 if (conf.joinReorderDPStarFilter) {
   // Compute the tables in a star-schema relationship.
-  val starJoin = StarSchemaDetection(conf).findStarJoins(items, 
conditions.toSeq)
+  val starJoin = StarSchemaDetection.findStarJoins(items, conditions.toSeq)
   val nonStarJoin = items.filterNot(starJoin.contains(_))
 
   if (starJoin.nonEmpty && nonStarJoin.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/75b168fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 946fa7b..d82af94 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -34,10 +34,10 @@ import org.apache.spark.sql.types._
  * Abstract class all optimizers should inherit of, contains the standard 
batches (extending
  * Optimizers can