spark git commit: [SPARK-24861][SS][TEST] create corrected temp directories in RateSourceSuite

2018-07-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 7db81ac8a -> 1462b1766


[SPARK-24861][SS][TEST] create corrected temp directories in RateSourceSuite

## What changes were proposed in this pull request?

`RateSourceSuite` may leave garbage files under `sql/core/dummy`, we should use 
a corrected temp directory

## How was this patch tested?

test only

Author: Wenchen Fan 

Closes #21817 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1462b176
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1462b176
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1462b176

Branch: refs/heads/master
Commit: 1462b17666729cd6c9e8dfa2a1fe9c2020d3f25b
Parents: 7db81ac
Author: Wenchen Fan 
Authored: Fri Jul 20 13:40:26 2018 +0800
Committer: Wenchen Fan 
Committed: Fri Jul 20 13:40:26 2018 +0800

--
 .../sources/RateStreamProviderSuite.scala   | 127 ++-
 1 file changed, 67 insertions(+), 60 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1462b176/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index bf72e5c..9115a38 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.sql.execution.streaming.sources
 
-import java.nio.file.Files
 import java.util.Optional
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
@@ -54,12 +52,15 @@ class RateSourceSuite extends StreamTest {
   }
 
   test("microbatch in registry") {
-DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() 
match {
-  case ds: MicroBatchReadSupport =>
-val reader = ds.createMicroBatchReader(Optional.empty(), "dummy", 
DataSourceOptions.empty())
-assert(reader.isInstanceOf[RateStreamMicroBatchReader])
-  case _ =>
-throw new IllegalStateException("Could not find read support for rate")
+withTempDir { temp =>
+  DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() 
match {
+case ds: MicroBatchReadSupport =>
+  val reader = ds.createMicroBatchReader(
+Optional.empty(), temp.getCanonicalPath, DataSourceOptions.empty())
+  assert(reader.isInstanceOf[RateStreamMicroBatchReader])
+case _ =>
+  throw new IllegalStateException("Could not find read support for 
rate")
+  }
 }
   }
 
@@ -108,69 +109,75 @@ class RateSourceSuite extends StreamTest {
   }
 
   test("microbatch - set offset") {
-val temp = Files.createTempDirectory("dummy").toString
-val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty(), 
temp)
-val startOffset = LongOffset(0L)
-val endOffset = LongOffset(1L)
-reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-assert(reader.getStartOffset() == startOffset)
-assert(reader.getEndOffset() == endOffset)
+withTempDir { temp =>
+  val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty(), 
temp.getCanonicalPath)
+  val startOffset = LongOffset(0L)
+  val endOffset = LongOffset(1L)
+  reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
+  assert(reader.getStartOffset() == startOffset)
+  assert(reader.getEndOffset() == endOffset)
+}
   }
 
   test("microbatch - infer offsets") {
-val tempFolder = Files.createTempDirectory("dummy").toString
-val reader = new RateStreamMicroBatchReader(
-  new DataSourceOptions(
-Map("numPartitions" -> "1", "rowsPerSecond" -> "100", "useManualClock" 
-> "true").asJava),
-  tempFolder)
-reader.clock.asInstanceOf[ManualClock].advance(10)
-reader.setOffsetRange(Optional.empty(), Optional.empty())
-reader.getStartOffset() match {
-  case r: LongOffset => assert(r.offset === 0L)
-  case _ => throw new IllegalStateException("unexpected offset type")
-}

spark git commit: [SPARK-24195][CORE] Ignore the files with "local" scheme in SparkContext.addFile

2018-07-19 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 7e847646d -> 7db81ac8a


[SPARK-24195][CORE] Ignore the files with "local" scheme in SparkContext.addFile

## What changes were proposed in this pull request?

In Spark "local" scheme means resources are already on the driver/executor 
nodes, this pr ignore the files with "local" scheme in `SparkContext.addFile` 
for fixing potential bug.

## How was this patch tested?

Existing tests.

Author: Yuanjian Li 

Closes #21533 from xuanyuanking/SPARK-24195.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7db81ac8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7db81ac8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7db81ac8

Branch: refs/heads/master
Commit: 7db81ac8a2d6c3c19db387d3d25053750b1404dd
Parents: 7e84764
Author: Yuanjian Li 
Authored: Fri Jul 20 11:25:51 2018 +0800
Committer: jerryshao 
Committed: Fri Jul 20 11:25:51 2018 +0800

--
 core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7db81ac8/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 531384a..78ba0b3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1524,7 +1524,11 @@ class SparkContext(config: SparkConf) extends Logging {
   def addFile(path: String, recursive: Boolean): Unit = {
 val uri = new Path(path).toUri
 val schemeCorrectedPath = uri.getScheme match {
-  case null | "local" => new File(path).getCanonicalFile.toURI.toString
+  case null => new File(path).getCanonicalFile.toURI.toString
+  case "local" =>
+logWarning("File with 'local' scheme is not supported to add to file 
server, since " +
+  "it is already available on every node.")
+return
   case _ => path
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24307][CORE] Support reading remote cached partitions > 2gb

2018-07-19 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 67e108daa -> 7e847646d


[SPARK-24307][CORE] Support reading remote cached partitions > 2gb

(1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.

(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
mapping the entire file in chunks.

Added unit tests.  Ran the randomized test a couple of hundred times on my 
laptop.  Tests cover the equivalent of SPARK-24107 for the 
ChunkedByteBufferFileRegion.  Also tested on a cluster with remote cache reads 
>2gb (in memory and on disk).

Author: Imran Rashid 

Closes #21440 from squito/chunked_bb_file_region.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e847646
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e847646
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e847646

Branch: refs/heads/master
Commit: 7e847646d1f377f46dc3154dea37148d4e557a03
Parents: 67e108d
Author: Imran Rashid 
Authored: Fri Jul 20 11:16:53 2018 +0800
Committer: jerryshao 
Committed: Fri Jul 20 11:16:53 2018 +0800

--
 .../org/apache/spark/storage/BlockManager.scala |  11 +-
 .../spark/util/io/ChunkedByteBuffer.scala   |  44 +-
 .../util/io/ChunkedByteBufferFileRegion.scala   |  86 +++
 .../io/ChunkedByteBufferFileRegionSuite.scala   | 152 +++
 .../spark/io/ChunkedByteBufferSuite.scala   |   2 +-
 5 files changed, 286 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7e847646/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0e1c7d5..1db0327 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -130,6 +130,8 @@ private[spark] class BlockManager(
 
   private[spark] val externalShuffleServiceEnabled =
 conf.getBoolean("spark.shuffle.service.enabled", false)
+  private val chunkSize =
+conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", 
Int.MaxValue.toString).toInt
 
   val diskBlockManager = {
 // Only perform cleanup if an external service is not serving our shuffle 
files.
@@ -660,6 +662,11 @@ private[spark] class BlockManager(
* Get block from remote block managers as serialized bytes.
*/
   def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+// TODO if we change this method to return the ManagedBuffer, then 
getRemoteValues
+// could just use the inputStream on the temp file, rather than 
memory-mapping the file.
+// Until then, replication can cause the process to use too much memory 
and get killed
+// by the OS / cluster manager (not a java OOM, since its a memory-mapped 
file) even though
+// we've read the data to disk.
 logDebug(s"Getting remote block $blockId")
 require(blockId != null, "BlockId is null")
 var runningFailureCount = 0
@@ -690,7 +697,7 @@ private[spark] class BlockManager(
   logDebug(s"Getting remote block $blockId from $loc")
   val data = try {
 blockTransferService.fetchBlockSync(
-  loc.host, loc.port, loc.executorId, blockId.toString, 
tempFileManager).nioByteBuffer()
+  loc.host, loc.port, loc.executorId, blockId.toString, 
tempFileManager)
   } catch {
 case NonFatal(e) =>
   runningFailureCount += 1
@@ -724,7 +731,7 @@ private[spark] class BlockManager(
   }
 
   if (data != null) {
-return Some(new ChunkedByteBuffer(data))
+return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize))
   }
   logDebug(s"The value of block $blockId is null")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7e847646/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 700ce56..efed90c 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -17,17 +17,21 @@
 
 package org.apache.spark.util.io
 
-import java.io.InputStream
+import java.io.{File, FileInputStream, InputStream}
 import java.nio.ByteBuffer
-import java.nio.channels.WritableByteChannel
+import 

svn commit: r28240 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_19_16_01-67e108d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-07-19 Thread pwendell
Author: pwendell
Date: Thu Jul 19 23:15:58 2018
New Revision: 28240

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_07_19_16_01-67e108d docs


[This commit notification would consist of 1468 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24846][SQL] Made hashCode ExprId independent of jvmId

2018-07-19 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master b3d88ac02 -> 67e108daa


[SPARK-24846][SQL] Made hashCode ExprId independent of jvmId

## What changes were proposed in this pull request?
Made ExprId hashCode independent of jvmId to make canonicalization independent 
of JVM, by overriding hashCode (and necessarily also equality) to depend on id 
only

## How was this patch tested?
Created a unit test ExprIdSuite
Ran all unit tests of sql/catalyst

Author: Ger van Rossum 

Closes #21806 from gvr/spark24846-canonicalization.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67e108da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67e108da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67e108da

Branch: refs/heads/master
Commit: 67e108daa6f324e7f4f7db2bda980a9945b59396
Parents: b3d88ac
Author: Ger van Rossum 
Authored: Thu Jul 19 23:28:16 2018 +0200
Committer: Herman van Hovell 
Committed: Thu Jul 19 23:28:16 2018 +0200

--
 .../catalyst/expressions/namedExpressions.scala | 11 -
 .../sql/catalyst/expressions/ExprIdSuite.scala  | 50 
 2 files changed, 60 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/67e108da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 8df8704..ce5c280 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -40,7 +40,16 @@ object NamedExpression {
  *
  * The `id` field is unique within a given JVM, while the `uuid` is used to 
uniquely identify JVMs.
  */
-case class ExprId(id: Long, jvmId: UUID)
+case class ExprId(id: Long, jvmId: UUID) {
+
+  override def equals(other: Any): Boolean = other match {
+case ExprId(id, jvmId) => this.id == id && this.jvmId == jvmId
+case _ => false
+  }
+
+  override def hashCode(): Int = id.hashCode()
+
+}
 
 object ExprId {
   def apply(id: Long): ExprId = ExprId(id, NamedExpression.jvmId)

http://git-wip-us.apache.org/repos/asf/spark/blob/67e108da/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExprIdSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExprIdSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExprIdSuite.scala
new file mode 100644
index 000..2352db4
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExprIdSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.UUID
+
+import org.apache.spark.SparkFunSuite
+
+class ExprIdSuite extends SparkFunSuite {
+
+  private val jvmId = UUID.randomUUID()
+  private val otherJvmId = UUID.randomUUID()
+
+  test("hashcode independent of jvmId") {
+val exprId1 = ExprId(12, jvmId)
+val exprId2 = ExprId(12, otherJvmId)
+assert(exprId1 != exprId2)
+assert(exprId1.hashCode() == exprId2.hashCode())
+  }
+
+  test("equality should depend on both id and jvmId") {
+val exprId1 = ExprId(1, jvmId)
+val exprId2 = ExprId(1, jvmId)
+assert(exprId1 == exprId2)
+
+val exprId3 = ExprId(1, jvmId)
+val exprId4 = ExprId(2, jvmId)
+assert(exprId3 != exprId4)
+
+val exprId5 = ExprId(1, jvmId)
+val exprId6 = ExprId(1, otherJvmId)
+assert(exprId5 != exprId6)
+  }
+
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: 

spark git commit: [SPARK-22187][SS] Update unsaferow format for saved state in flatMapGroupsWithState to allow timeouts with deleted state

2018-07-19 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 8d707b060 -> b3d88ac02


[SPARK-22187][SS] Update unsaferow format for saved state in 
flatMapGroupsWithState to allow timeouts with deleted state

## What changes were proposed in this pull request?

Currently, the group state of user-defined-type is encoded as top-level columns 
in the UnsafeRows stores in the state store. The timeout timestamp is also 
saved as (when needed) as the last top-level column. Since the group state is 
serialized to top-level columns, you cannot save "null" as a value of state 
(setting null in all the top-level columns is not equivalent). So we don't let 
the user set the timeout without initializing the state for a key. Based on 
user experience, this leads to confusion.

This PR is to change the row format such that the state is saved as nested 
columns. This would allow the state to be set to null, and avoid these 
confusing corner cases. However, queries recovering from existing checkpoint 
will use the previous format to maintain compatibility with existing production 
queries.

## How was this patch tested?
Refactored existing end-to-end tests and added new tests for explicitly testing 
obj-to-row conversion for both state formats.

Author: Tathagata Das 

Closes #21739 from tdas/SPARK-22187-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3d88ac0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3d88ac0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3d88ac0

Branch: refs/heads/master
Commit: b3d88ac02940eff4c867d3acb79fe5ff9d724e83
Parents: 8d707b0
Author: Tathagata Das 
Authored: Thu Jul 19 13:17:28 2018 -0700
Committer: Tathagata Das 
Committed: Thu Jul 19 13:17:28 2018 -0700

--
 .../sql/catalyst/expressions/Expression.scala   |   3 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 +
 .../spark/sql/execution/SparkStrategies.scala   |   5 +-
 .../streaming/FlatMapGroupsWithStateExec.scala  | 136 +++---
 .../sql/execution/streaming/OffsetSeq.scala |  10 +-
 .../FlatMapGroupsWithStateExecHelper.scala  | 247 ++
 .../commits/0   |   2 +
 .../commits/1   |   2 +
 .../metadata|   1 +
 .../offsets/0   |   3 +
 .../offsets/1   |   3 +
 .../state/0/0/1.delta   | Bin 0 -> 84 bytes
 .../state/0/0/2.delta   | Bin 0 -> 46 bytes
 .../state/0/1/1.delta   | Bin 0 -> 46 bytes
 .../state/0/1/2.delta   | Bin 0 -> 46 bytes
 .../state/0/2/1.delta   | Bin 0 -> 46 bytes
 .../state/0/2/2.delta   | Bin 0 -> 46 bytes
 .../state/0/3/1.delta   | Bin 0 -> 46 bytes
 .../state/0/3/2.delta   | Bin 0 -> 46 bytes
 .../state/0/4/1.delta   | Bin 0 -> 46 bytes
 .../state/0/4/2.delta   | Bin 0 -> 46 bytes
 .../FlatMapGroupsWithStateExecHelperSuite.scala | 218 
 .../streaming/FlatMapGroupsWithStateSuite.scala | 250 ++-
 23 files changed, 708 insertions(+), 180 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b3d88ac0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index f7d1b10..a69b804 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -715,7 +715,8 @@ trait ComplexTypeMergingExpression extends Expression {
   "The collection of input data types must not be empty.")
 require(
   TypeCoercion.haveSameType(inputTypesForMerging),
-  "All input types must be the same except nullable, containsNull, 
valueContainsNull flags.")
+  "All input types must be the same except nullable, containsNull, 
valueContainsNull flags." +
+s" The input types found 
are\n\t${inputTypesForMerging.mkString("\n\t")}")
 
inputTypesForMerging.reduceLeft(TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(_,
 _).get)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3d88ac0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 

spark git commit: [SPARK-24858][SQL] Avoid unnecessary parquet footer reads

2018-07-19 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 8b7d4f842 -> 6a9a058e0


[SPARK-24858][SQL] Avoid unnecessary parquet footer reads

## What changes were proposed in this pull request?

Currently the same Parquet footer is read twice in the function 
`buildReaderWithPartitionValues` of ParquetFileFormat if filter push down is 
enabled.

Fix it with simple changes.
## How was this patch tested?

Unit test

Author: Gengliang Wang 

Closes #21814 from gengliangwang/parquetFooter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a9a058e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a9a058e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a9a058e

Branch: refs/heads/master
Commit: 6a9a058e09abb1b629680a546c3d6358b49f723a
Parents: 8b7d4f8
Author: Gengliang Wang 
Authored: Thu Jul 19 22:24:53 2018 +0800
Committer: hyukjinkwon 
Committed: Thu Jul 19 22:24:53 2018 +0800

--
 .../datasources/parquet/ParquetFileFormat.scala  | 15 ---
 1 file changed, 8 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6a9a058e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 295960b..2d4ac76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -364,10 +364,11 @@ class ParquetFileFormat
 
   val sharedConf = broadcastedHadoopConf.value.value
 
+  lazy val footerFileMetaData =
+ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
   // Try to push down filters when filter push-down is enabled.
   val pushed = if (enableParquetFilterPushDown) {
-val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS)
-  .getFileMetaData.getSchema
+val parquetSchema = footerFileMetaData.getSchema
 val parquetFilters = new ParquetFilters(pushDownDate, 
pushDownTimestamp, pushDownDecimal,
   pushDownStringStartWith, pushDownInFilterThreshold)
 filters
@@ -384,12 +385,12 @@ class ParquetFileFormat
   // *only* if the file was created by something other than "parquet-mr", 
so check the actual
   // writer here for this file.  We have to do this per-file, as each file 
in the table may
   // have different writers.
-  def isCreatedByParquetMr(): Boolean = {
-val footer = ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS)
-footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr")
-  }
+  // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+  def isCreatedByParquetMr: Boolean =
+footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
   val convertTz =
-if (timestampConversion && !isCreatedByParquetMr()) {
+if (timestampConversion && !isCreatedByParquetMr) {
   
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
 } else {
   None


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28223 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_19_04_02-8b7d4f8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-07-19 Thread pwendell
Author: pwendell
Date: Thu Jul 19 11:22:25 2018
New Revision: 28223

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_07_19_04_02-8b7d4f8 docs


[This commit notification would consist of 1468 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28214 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_19_00_01-d05a926-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-07-19 Thread pwendell
Author: pwendell
Date: Thu Jul 19 07:17:53 2018
New Revision: 28214

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_07_19_00_01-d05a926 docs


[This commit notification would consist of 1468 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider

2018-07-19 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master d05a926e7 -> 8b7d4f842


[SPARK-24717][SS] Split out max retain version of state for memory in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch proposes breaking down configuration of retaining batch size on 
state into two pieces: files and in memory (cache). While this patch reuses 
existing configuration for files, it introduces new configuration, 
"spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of 
batch to retain in memory.

## How was this patch tested?

Apply this patch on top of SPARK-24441 
(https://github.com/apache/spark/pull/21469), and manually tested in various 
workloads to ensure overall size of states in memory is around 2x or less of 
the size of latest version of state, while it was 10x ~ 80x before applying the 
patch.

Author: Jungtaek Lim 

Closes #21700 from HeartSaVioR/SPARK-24717.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b7d4f84
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b7d4f84
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b7d4f84

Branch: refs/heads/master
Commit: 8b7d4f842fdc90b8d1c37080bdd9b5e1d070f5c0
Parents: d05a926
Author: Jungtaek Lim 
Authored: Thu Jul 19 00:07:35 2018 -0700
Committer: Tathagata Das 
Committed: Thu Jul 19 00:07:35 2018 -0700

--
 .../org/apache/spark/sql/internal/SQLConf.scala |  11 ++
 .../state/HDFSBackedStateStoreProvider.scala|  57 +--
 .../streaming/state/StateStoreConf.scala|   3 +
 .../streaming/state/StateStoreSuite.scala   | 150 +--
 4 files changed, 196 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8b7d4f84/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 41fe0c3..9239d4e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -854,6 +854,15 @@ object SQLConf {
 .intConf
 .createWithDefault(100)
 
+  val MAX_BATCHES_TO_RETAIN_IN_MEMORY = 
buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
+.internal()
+.doc("The maximum number of batches which will be retained in memory to 
avoid " +
+  "loading from files. The value adjusts a trade-off between memory usage 
vs cache miss: " +
+  "'2' covers both success and direct failure cases, '1' covers only 
success case, " +
+  "and '0' covers extreme case - disable cache to maximize memory size of 
executors.")
+.intConf
+.createWithDefault(2)
+
   val UNSUPPORTED_OPERATION_CHECK_ENABLED =
 buildConf("spark.sql.streaming.unsupportedOperationCheck")
   .internal()
@@ -1507,6 +1516,8 @@ class SQLConf extends Serializable with Logging {
 
   def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
 
+  def maxBatchesToRetainInMemory: Int = 
getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
+
   def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
 
   def parquetFilterPushDownDate: Boolean = 
getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/8b7d4f84/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 118c82a..523acef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming.state
 
 import java.io._
+import java.util
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -203,6 +204,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
 this.valueSchema = valueSchema
 this.storeConf = storeConf
 this.hadoopConf = hadoopConf
+this.numberOfVersionsToRetainInMemory = 
storeConf.maxVersionsToRetainInMemory
 fm.mkdirs(baseDir)
   }
 
@@ -220,7 +222,7 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
   }
 
   override def close(): Unit = {
-loadedMaps.values.foreach(_.clear())
+