spark git commit: [SPARK-24861][SS][TEST] create corrected temp directories in RateSourceSuite
Repository: spark Updated Branches: refs/heads/master 7db81ac8a -> 1462b1766 [SPARK-24861][SS][TEST] create corrected temp directories in RateSourceSuite ## What changes were proposed in this pull request? `RateSourceSuite` may leave garbage files under `sql/core/dummy`, we should use a corrected temp directory ## How was this patch tested? test only Author: Wenchen Fan Closes #21817 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1462b176 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1462b176 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1462b176 Branch: refs/heads/master Commit: 1462b17666729cd6c9e8dfa2a1fe9c2020d3f25b Parents: 7db81ac Author: Wenchen Fan Authored: Fri Jul 20 13:40:26 2018 +0800 Committer: Wenchen Fan Committed: Fri Jul 20 13:40:26 2018 +0800 -- .../sources/RateStreamProviderSuite.scala | 127 ++- 1 file changed, 67 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1462b176/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index bf72e5c..9115a38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -17,15 +17,13 @@ package org.apache.spark.sql.execution.streaming.sources -import java.nio.file.Files import java.util.Optional import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ @@ -54,12 +52,15 @@ class RateSourceSuite extends StreamTest { } test("microbatch in registry") { -DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match { - case ds: MicroBatchReadSupport => -val reader = ds.createMicroBatchReader(Optional.empty(), "dummy", DataSourceOptions.empty()) -assert(reader.isInstanceOf[RateStreamMicroBatchReader]) - case _ => -throw new IllegalStateException("Could not find read support for rate") +withTempDir { temp => + DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match { +case ds: MicroBatchReadSupport => + val reader = ds.createMicroBatchReader( +Optional.empty(), temp.getCanonicalPath, DataSourceOptions.empty()) + assert(reader.isInstanceOf[RateStreamMicroBatchReader]) +case _ => + throw new IllegalStateException("Could not find read support for rate") + } } } @@ -108,69 +109,75 @@ class RateSourceSuite extends StreamTest { } test("microbatch - set offset") { -val temp = Files.createTempDirectory("dummy").toString -val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty(), temp) -val startOffset = LongOffset(0L) -val endOffset = LongOffset(1L) -reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)) -assert(reader.getStartOffset() == startOffset) -assert(reader.getEndOffset() == endOffset) +withTempDir { temp => + val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty(), temp.getCanonicalPath) + val startOffset = LongOffset(0L) + val endOffset = LongOffset(1L) + reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)) + assert(reader.getStartOffset() == startOffset) + assert(reader.getEndOffset() == endOffset) +} } test("microbatch - infer offsets") { -val tempFolder = Files.createTempDirectory("dummy").toString -val reader = new RateStreamMicroBatchReader( - new DataSourceOptions( -Map("numPartitions" -> "1", "rowsPerSecond" -> "100", "useManualClock" -> "true").asJava), - tempFolder) -reader.clock.asInstanceOf[ManualClock].advance(10) -reader.setOffsetRange(Optional.empty(), Optional.empty()) -reader.getStartOffset() match { - case r: LongOffset => assert(r.offset === 0L) - case _ => throw new IllegalStateException("unexpected offset type") -}
spark git commit: [SPARK-24195][CORE] Ignore the files with "local" scheme in SparkContext.addFile
Repository: spark Updated Branches: refs/heads/master 7e847646d -> 7db81ac8a [SPARK-24195][CORE] Ignore the files with "local" scheme in SparkContext.addFile ## What changes were proposed in this pull request? In Spark "local" scheme means resources are already on the driver/executor nodes, this pr ignore the files with "local" scheme in `SparkContext.addFile` for fixing potential bug. ## How was this patch tested? Existing tests. Author: Yuanjian Li Closes #21533 from xuanyuanking/SPARK-24195. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7db81ac8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7db81ac8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7db81ac8 Branch: refs/heads/master Commit: 7db81ac8a2d6c3c19db387d3d25053750b1404dd Parents: 7e84764 Author: Yuanjian Li Authored: Fri Jul 20 11:25:51 2018 +0800 Committer: jerryshao Committed: Fri Jul 20 11:25:51 2018 +0800 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7db81ac8/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 531384a..78ba0b3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1524,7 +1524,11 @@ class SparkContext(config: SparkConf) extends Logging { def addFile(path: String, recursive: Boolean): Unit = { val uri = new Path(path).toUri val schemeCorrectedPath = uri.getScheme match { - case null | "local" => new File(path).getCanonicalFile.toURI.toString + case null => new File(path).getCanonicalFile.toURI.toString + case "local" => +logWarning("File with 'local' scheme is not supported to add to file server, since " + + "it is already available on every node.") +return case _ => path } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24307][CORE] Support reading remote cached partitions > 2gb
Repository: spark Updated Branches: refs/heads/master 67e108daa -> 7e847646d [SPARK-24307][CORE] Support reading remote cached partitions > 2gb (1) Netty's ByteBuf cannot support data > 2gb. So to transfer data from a ChunkedByteBuffer over the network, we use a custom version of FileRegion which is backed by the ChunkedByteBuffer. (2) On the receiving end, we need to expose all the data in a FileSegmentManagedBuffer as a ChunkedByteBuffer. We do that by memory mapping the entire file in chunks. Added unit tests. Ran the randomized test a couple of hundred times on my laptop. Tests cover the equivalent of SPARK-24107 for the ChunkedByteBufferFileRegion. Also tested on a cluster with remote cache reads >2gb (in memory and on disk). Author: Imran Rashid Closes #21440 from squito/chunked_bb_file_region. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e847646 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e847646 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e847646 Branch: refs/heads/master Commit: 7e847646d1f377f46dc3154dea37148d4e557a03 Parents: 67e108d Author: Imran Rashid Authored: Fri Jul 20 11:16:53 2018 +0800 Committer: jerryshao Committed: Fri Jul 20 11:16:53 2018 +0800 -- .../org/apache/spark/storage/BlockManager.scala | 11 +- .../spark/util/io/ChunkedByteBuffer.scala | 44 +- .../util/io/ChunkedByteBufferFileRegion.scala | 86 +++ .../io/ChunkedByteBufferFileRegionSuite.scala | 152 +++ .../spark/io/ChunkedByteBufferSuite.scala | 2 +- 5 files changed, 286 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7e847646/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0e1c7d5..1db0327 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -130,6 +130,8 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + private val chunkSize = +conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. @@ -660,6 +662,11 @@ private[spark] class BlockManager( * Get block from remote block managers as serialized bytes. */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { +// TODO if we change this method to return the ManagedBuffer, then getRemoteValues +// could just use the inputStream on the temp file, rather than memory-mapping the file. +// Until then, replication can cause the process to use too much memory and get killed +// by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though +// we've read the data to disk. logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") var runningFailureCount = 0 @@ -690,7 +697,7 @@ private[spark] class BlockManager( logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( - loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer() + loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager) } catch { case NonFatal(e) => runningFailureCount += 1 @@ -724,7 +731,7 @@ private[spark] class BlockManager( } if (data != null) { -return Some(new ChunkedByteBuffer(data)) +return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize)) } logDebug(s"The value of block $blockId is null") } http://git-wip-us.apache.org/repos/asf/spark/blob/7e847646/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 700ce56..efed90c 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -17,17 +17,21 @@ package org.apache.spark.util.io -import java.io.InputStream +import java.io.{File, FileInputStream, InputStream} import java.nio.ByteBuffer -import java.nio.channels.WritableByteChannel +import
svn commit: r28240 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_19_16_01-67e108d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jul 19 23:15:58 2018 New Revision: 28240 Log: Apache Spark 2.4.0-SNAPSHOT-2018_07_19_16_01-67e108d docs [This commit notification would consist of 1468 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24846][SQL] Made hashCode ExprId independent of jvmId
Repository: spark Updated Branches: refs/heads/master b3d88ac02 -> 67e108daa [SPARK-24846][SQL] Made hashCode ExprId independent of jvmId ## What changes were proposed in this pull request? Made ExprId hashCode independent of jvmId to make canonicalization independent of JVM, by overriding hashCode (and necessarily also equality) to depend on id only ## How was this patch tested? Created a unit test ExprIdSuite Ran all unit tests of sql/catalyst Author: Ger van Rossum Closes #21806 from gvr/spark24846-canonicalization. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67e108da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67e108da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67e108da Branch: refs/heads/master Commit: 67e108daa6f324e7f4f7db2bda980a9945b59396 Parents: b3d88ac Author: Ger van Rossum Authored: Thu Jul 19 23:28:16 2018 +0200 Committer: Herman van Hovell Committed: Thu Jul 19 23:28:16 2018 +0200 -- .../catalyst/expressions/namedExpressions.scala | 11 - .../sql/catalyst/expressions/ExprIdSuite.scala | 50 2 files changed, 60 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67e108da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 8df8704..ce5c280 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -40,7 +40,16 @@ object NamedExpression { * * The `id` field is unique within a given JVM, while the `uuid` is used to uniquely identify JVMs. */ -case class ExprId(id: Long, jvmId: UUID) +case class ExprId(id: Long, jvmId: UUID) { + + override def equals(other: Any): Boolean = other match { +case ExprId(id, jvmId) => this.id == id && this.jvmId == jvmId +case _ => false + } + + override def hashCode(): Int = id.hashCode() + +} object ExprId { def apply(id: Long): ExprId = ExprId(id, NamedExpression.jvmId) http://git-wip-us.apache.org/repos/asf/spark/blob/67e108da/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExprIdSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExprIdSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExprIdSuite.scala new file mode 100644 index 000..2352db4 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExprIdSuite.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.UUID + +import org.apache.spark.SparkFunSuite + +class ExprIdSuite extends SparkFunSuite { + + private val jvmId = UUID.randomUUID() + private val otherJvmId = UUID.randomUUID() + + test("hashcode independent of jvmId") { +val exprId1 = ExprId(12, jvmId) +val exprId2 = ExprId(12, otherJvmId) +assert(exprId1 != exprId2) +assert(exprId1.hashCode() == exprId2.hashCode()) + } + + test("equality should depend on both id and jvmId") { +val exprId1 = ExprId(1, jvmId) +val exprId2 = ExprId(1, jvmId) +assert(exprId1 == exprId2) + +val exprId3 = ExprId(1, jvmId) +val exprId4 = ExprId(2, jvmId) +assert(exprId3 != exprId4) + +val exprId5 = ExprId(1, jvmId) +val exprId6 = ExprId(1, otherJvmId) +assert(exprId5 != exprId6) + } + +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
spark git commit: [SPARK-22187][SS] Update unsaferow format for saved state in flatMapGroupsWithState to allow timeouts with deleted state
Repository: spark Updated Branches: refs/heads/master 8d707b060 -> b3d88ac02 [SPARK-22187][SS] Update unsaferow format for saved state in flatMapGroupsWithState to allow timeouts with deleted state ## What changes were proposed in this pull request? Currently, the group state of user-defined-type is encoded as top-level columns in the UnsafeRows stores in the state store. The timeout timestamp is also saved as (when needed) as the last top-level column. Since the group state is serialized to top-level columns, you cannot save "null" as a value of state (setting null in all the top-level columns is not equivalent). So we don't let the user set the timeout without initializing the state for a key. Based on user experience, this leads to confusion. This PR is to change the row format such that the state is saved as nested columns. This would allow the state to be set to null, and avoid these confusing corner cases. However, queries recovering from existing checkpoint will use the previous format to maintain compatibility with existing production queries. ## How was this patch tested? Refactored existing end-to-end tests and added new tests for explicitly testing obj-to-row conversion for both state formats. Author: Tathagata Das Closes #21739 from tdas/SPARK-22187-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3d88ac0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3d88ac0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3d88ac0 Branch: refs/heads/master Commit: b3d88ac02940eff4c867d3acb79fe5ff9d724e83 Parents: 8d707b0 Author: Tathagata Das Authored: Thu Jul 19 13:17:28 2018 -0700 Committer: Tathagata Das Committed: Thu Jul 19 13:17:28 2018 -0700 -- .../sql/catalyst/expressions/Expression.scala | 3 +- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../spark/sql/execution/SparkStrategies.scala | 5 +- .../streaming/FlatMapGroupsWithStateExec.scala | 136 +++--- .../sql/execution/streaming/OffsetSeq.scala | 10 +- .../FlatMapGroupsWithStateExecHelper.scala | 247 ++ .../commits/0 | 2 + .../commits/1 | 2 + .../metadata| 1 + .../offsets/0 | 3 + .../offsets/1 | 3 + .../state/0/0/1.delta | Bin 0 -> 84 bytes .../state/0/0/2.delta | Bin 0 -> 46 bytes .../state/0/1/1.delta | Bin 0 -> 46 bytes .../state/0/1/2.delta | Bin 0 -> 46 bytes .../state/0/2/1.delta | Bin 0 -> 46 bytes .../state/0/2/2.delta | Bin 0 -> 46 bytes .../state/0/3/1.delta | Bin 0 -> 46 bytes .../state/0/3/2.delta | Bin 0 -> 46 bytes .../state/0/4/1.delta | Bin 0 -> 46 bytes .../state/0/4/2.delta | Bin 0 -> 46 bytes .../FlatMapGroupsWithStateExecHelperSuite.scala | 218 .../streaming/FlatMapGroupsWithStateSuite.scala | 250 ++- 23 files changed, 708 insertions(+), 180 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b3d88ac0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index f7d1b10..a69b804 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -715,7 +715,8 @@ trait ComplexTypeMergingExpression extends Expression { "The collection of input data types must not be empty.") require( TypeCoercion.haveSameType(inputTypesForMerging), - "All input types must be the same except nullable, containsNull, valueContainsNull flags.") + "All input types must be the same except nullable, containsNull, valueContainsNull flags." + +s" The input types found are\n\t${inputTypesForMerging.mkString("\n\t")}") inputTypesForMerging.reduceLeft(TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(_, _).get) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b3d88ac0/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git
spark git commit: [SPARK-24858][SQL] Avoid unnecessary parquet footer reads
Repository: spark Updated Branches: refs/heads/master 8b7d4f842 -> 6a9a058e0 [SPARK-24858][SQL] Avoid unnecessary parquet footer reads ## What changes were proposed in this pull request? Currently the same Parquet footer is read twice in the function `buildReaderWithPartitionValues` of ParquetFileFormat if filter push down is enabled. Fix it with simple changes. ## How was this patch tested? Unit test Author: Gengliang Wang Closes #21814 from gengliangwang/parquetFooter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a9a058e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a9a058e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a9a058e Branch: refs/heads/master Commit: 6a9a058e09abb1b629680a546c3d6358b49f723a Parents: 8b7d4f8 Author: Gengliang Wang Authored: Thu Jul 19 22:24:53 2018 +0800 Committer: hyukjinkwon Committed: Thu Jul 19 22:24:53 2018 +0800 -- .../datasources/parquet/ParquetFileFormat.scala | 15 --- 1 file changed, 8 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a9a058e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 295960b..2d4ac76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -364,10 +364,11 @@ class ParquetFileFormat val sharedConf = broadcastedHadoopConf.value.value + lazy val footerFileMetaData = +ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { -val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS) - .getFileMetaData.getSchema +val parquetSchema = footerFileMetaData.getSchema val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold) filters @@ -384,12 +385,12 @@ class ParquetFileFormat // *only* if the file was created by something other than "parquet-mr", so check the actual // writer here for this file. We have to do this per-file, as each file in the table may // have different writers. - def isCreatedByParquetMr(): Boolean = { -val footer = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS) -footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr") - } + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = +footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + val convertTz = -if (timestampConversion && !isCreatedByParquetMr()) { +if (timestampConversion && !isCreatedByParquetMr) { Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) } else { None - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28223 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_19_04_02-8b7d4f8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jul 19 11:22:25 2018 New Revision: 28223 Log: Apache Spark 2.4.0-SNAPSHOT-2018_07_19_04_02-8b7d4f8 docs [This commit notification would consist of 1468 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28214 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_19_00_01-d05a926-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Jul 19 07:17:53 2018 New Revision: 28214 Log: Apache Spark 2.4.0-SNAPSHOT-2018_07_19_00_01-d05a926 docs [This commit notification would consist of 1468 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider
Repository: spark Updated Branches: refs/heads/master d05a926e7 -> 8b7d4f842 [SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider ## What changes were proposed in this pull request? This patch proposes breaking down configuration of retaining batch size on state into two pieces: files and in memory (cache). While this patch reuses existing configuration for files, it introduces new configuration, "spark.sql.streaming.maxBatchesToRetainInMemory" to configure max count of batch to retain in memory. ## How was this patch tested? Apply this patch on top of SPARK-24441 (https://github.com/apache/spark/pull/21469), and manually tested in various workloads to ensure overall size of states in memory is around 2x or less of the size of latest version of state, while it was 10x ~ 80x before applying the patch. Author: Jungtaek Lim Closes #21700 from HeartSaVioR/SPARK-24717. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b7d4f84 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b7d4f84 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b7d4f84 Branch: refs/heads/master Commit: 8b7d4f842fdc90b8d1c37080bdd9b5e1d070f5c0 Parents: d05a926 Author: Jungtaek Lim Authored: Thu Jul 19 00:07:35 2018 -0700 Committer: Tathagata Das Committed: Thu Jul 19 00:07:35 2018 -0700 -- .../org/apache/spark/sql/internal/SQLConf.scala | 11 ++ .../state/HDFSBackedStateStoreProvider.scala| 57 +-- .../streaming/state/StateStoreConf.scala| 3 + .../streaming/state/StateStoreSuite.scala | 150 +-- 4 files changed, 196 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b7d4f84/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 41fe0c3..9239d4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -854,6 +854,15 @@ object SQLConf { .intConf .createWithDefault(100) + val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory") +.internal() +.doc("The maximum number of batches which will be retained in memory to avoid " + + "loading from files. The value adjusts a trade-off between memory usage vs cache miss: " + + "'2' covers both success and direct failure cases, '1' covers only success case, " + + "and '0' covers extreme case - disable cache to maximize memory size of executors.") +.intConf +.createWithDefault(2) + val UNSUPPORTED_OPERATION_CHECK_ENABLED = buildConf("spark.sql.streaming.unsupportedOperationCheck") .internal() @@ -1507,6 +1516,8 @@ class SQLConf extends Serializable with Logging { def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) + def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY) + def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED) http://git-wip-us.apache.org/repos/asf/spark/blob/8b7d4f84/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 118c82a..523acef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ +import java.util import java.util.Locale import scala.collection.JavaConverters._ @@ -203,6 +204,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit this.valueSchema = valueSchema this.storeConf = storeConf this.hadoopConf = hadoopConf +this.numberOfVersionsToRetainInMemory = storeConf.maxVersionsToRetainInMemory fm.mkdirs(baseDir) } @@ -220,7 +222,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def close(): Unit = { -loadedMaps.values.foreach(_.clear()) +