[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147824694 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) --- End diff -- we should put the map read from snapshot file into `loadedMaps`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147784280 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) { + return currentVersionMap.get +} + --- End diff -- nit: extra empty line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147824364 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) { + return currentVersionMap.get +} + + +// Find the most recent map before this version that we can. +// [SPARK-22305] This must be done iteratively to avoid stack overflow. +var lastAvailableVersion = version +var lastAvailableMap: Option[MapType] = None +while (lastAvailableMap.isEmpty) { + lastAvailableVersion -= 1 + + if (lastAvailableVersion <= 0) { +// Use an empty map for versions 0 or less. +lastAvailableMap = Some(new MapType) + } else { +lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } +.orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + +// Load all the deltas from the version after the last available one up to the target version. +// The last available version is the one with a full snapshot, so it doesn't need deltas. +val resultMap = lastAvailableMap.get --- End diff -- We should create a new map here to not change the previous map in case the maintenance task is using it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147826631 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { -if (version <= 0) return new MapType -synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { -val prevMap = loadMap(version - 1) -val newMap = new MapType(prevMap) -updateFromDeltaFile(version, newMap) -newMap + +// Shortcut if the map for this version is already there to avoid a redundant put. +val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) +if (currentVersionMap.isDefined) { + return currentVersionMap.get +} + + +// Find the most recent map before this version that we can. +// [SPARK-22305] This must be done iteratively to avoid stack overflow. +var lastAvailableVersion = version +var lastAvailableMap: Option[MapType] = None +while (lastAvailableMap.isEmpty) { + lastAvailableVersion -= 1 + + if (lastAvailableVersion <= 0) { +// Use an empty map for versions 0 or less. +lastAvailableMap = Some(new MapType) + } else { +lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } +.orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + +// Load all the deltas from the version after the last available one up to the target version. +// The last available version is the one with a full snapshot, so it doesn't need deltas. +val resultMap = lastAvailableMap.get +for (deltaVersion <- lastAvailableVersion + 1 to version) { + updateFromDeltaFile(deltaVersion, resultMap) +} + +loadedMaps.put(version, resultMap) --- End diff -- `loadedMaps.put(version, resultMap)` -> `synchronized { loadedMaps.put(version, resultMap) }` This is a different issue but since you are touching this, it's better to fix it as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19611 I think we can remove the unit test. It's obviously that `loadMap` is not recursive and will not cause StackOverflowError. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19469: [SPARK-22243][DStreams]spark.yarn.jars reload from confi...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19469 Sorry for the delay. I think it's not worth to design a new feature that's only for DStream. Instead, I would encourage people to use Structured Streaming in the new Spark versions. I'm okey to just merge this PR and future minor PRs (I don't think it will be a lot) for the similar issues. @ChenjunZou could you reopen this PR, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19581: [SPARK-22366] Support ignoring missing files
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19581 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19581: [SPARK-22366] Support ignoring missing files
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19581 LGTM pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19435: [MINOR][SS] "keyWithIndexToNumValues" -> "keyWithIndexTo...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19435 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19469: [SPARK-22243][DStreams]spark.yarn.jars reload from confi...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19469 LGTM. cc @jerryshao to take a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19469: [SPARK-22243][DStreams]spark.yarn.jars reload from confi...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19469 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19465: [SPARK-21988]Implement StreamingRelation.computeS...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/19465 [SPARK-21988]Implement StreamingRelation.computeStats to fix explain ## What changes were proposed in this pull request? Implement StreamingRelation.computeStats to fix explain ## How was this patch tested? - unit tests: `StreamingRelation.computeStats` and `StreamingExecutionRelation.computeStats`. - regression tests: `explain join with a normal source` and `explain join with MemoryStream`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-21988 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19465.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19465 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19465: [SPARK-21988]Implement StreamingRelation.computeStats to...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19465 cc @joseph-torres --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19461: [SPARK-22230] Swap per-row order in state store restore.
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19461 Discussed offline. We don't need to backport to branch-2.2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19461: [SPARK-22230] Swap per-row order in state store restore.
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19461 Oh, there are some conflicts with 2.2. @joseph-torres could you submit a backport PR, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19461: [SPARK-22230] Swap per-row order in state store restore.
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19461 Thanks! Merging to master and 2.2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19336: [SPARK-21947][SS] Check and report error when monotonica...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19336 LGTM. Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19432: [SPARK-22203][SQL]Add job description for file listing S...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19432 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19432: [SPARK-22203][SQL]Add job description for file listing S...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19432 cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19432: [SPARK-22203][SQL]Add job description for file li...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/19432 [SPARK-22203][SQL]Add job description for file listing Spark jobs ## What changes were proposed in this pull request? The user may be confused about some 1-tasks jobs. We can add a job description for these jobs so that the user can figure it out. ## How was this patch tested? The new unit test. Before: https://user-images.githubusercontent.com/1000778/31202567-f78d15c0-a917-11e7-841e-11b8bf8f0032.png;> After: https://user-images.githubusercontent.com/1000778/31202576-fc01e356-a917-11e7-9c2b-7bf80b153adb.png;> You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-22203 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19432.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19432 commit aaf41dd02b8f2109a84d36768fdf1802f7817961 Author: Shixiong Zhu <zsxw...@gmail.com> Date: 2017-10-04T21:58:32Z Add job description for file listing Spark jobs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19416 LGTM pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19416#discussion_r142583033 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, GetStructField, IsNull, Literal, UnsafeRow} +import org.apache.spark.sql.execution.ObjectOperator +import org.apache.spark.sql.execution.streaming.GroupStateImpl +import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP +import org.apache.spark.sql.types.{IntegerType, LongType, StructType} + + +class FlatMapGroupsWithState_StateManager( +stateEncoder: ExpressionEncoder[Any], +shouldStoreTimestamp: Boolean) extends Serializable { + + val stateSchema = { +val schema = new StructType().add("groupState", stateEncoder.schema, nullable = true) +if (shouldStoreTimestamp) schema.add("timeoutTimestamp", LongType) else schema + } + + def getState(store: StateStore, keyRow: UnsafeRow): FlatMapGroupsWithState_StateData = { +val stateRow = store.get(keyRow) +stateDataForGets.withNew( + keyRow, stateRow, getStateObj(stateRow), getTimestamp(stateRow)) + } + + def putState(store: StateStore, keyRow: UnsafeRow, state: Any, timestamp: Long): Unit = { +val stateRow = getStateRow(state) +setTimestamp(stateRow, timestamp) +store.put(keyRow, stateRow) + } + + def removeState(store: StateStore, keyRow: UnsafeRow): Unit = { +store.remove(keyRow) + } + + def getAllState(store: StateStore): Iterator[FlatMapGroupsWithState_StateData] = { +val stateDataForGetAllState = FlatMapGroupsWithState_StateData() +store.getRange(None, None).map { pair => + stateDataForGetAllState.withNew( +pair.key, pair.value, getStateObjFromRow(pair.value), getTimestamp(pair.value)) +} + } + + private val stateAttributes: Seq[Attribute] = stateSchema.toAttributes + + // Get the serializer for the state, taking into account whether we need to save timestamps + private val stateSerializer = { +val nestedStateExpr = CreateNamedStruct( + stateEncoder.namedExpressions.flatMap(e => Seq(Literal(e.name), e))) +if (shouldStoreTimestamp) { + Seq(nestedStateExpr, Literal(GroupStateImpl.NO_TIMESTAMP)) +} else { + Seq(nestedStateExpr) +} + } + + // Get the deserializer for the state. Note that this must be done in the driver, as + // resolving and binding of deserializer expressions to the encoded type can be safely done + // only in the driver. + private val stateDeserializer = { +val boundRefToNestedState = BoundReference(nestedStateOrdinal, stateEncoder.schema, true) +val deser = stateEncoder.resolveAndBind().deserializer.transformUp { + case BoundReference(ordinal, _, _) => GetStructField(boundRefToNestedState, ordinal) +} +CaseWhen(Seq(IsNull(boundRefToNestedState) -> Literal(null)), elseValue = deser).toCodegen() + } + + private lazy val nestedStateOrdinal = 0 + private lazy val timeoutTimestampOrdinal = 1 + + // Converters for translating state between rows and Java objects + private lazy val getStateObjFromRow = ObjectOperator.deserializeRowToObject( +stateDeserializer, stateAttributes) + private lazy val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer) + + private lazy val stateDataForGets = FlatMapGroupsWithState_StateData() + + /** Returns the state as Java object if defined */
[GitHub] spark issue #16568: [SPARK-18971][Core]Upgrade Netty to 4.0.43.Final
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/16568 @srowen sounds good to me. 2.2.0 was out several months ago and I didn't see any issue caused by the new netty version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19314: [SPARK-22094][SS]processAllAvailable should check the qu...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19314 Thanks! Merging to master and branch-2.2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19314: [SPARK-22094][SS]processAllAvailable should check the qu...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19314 cc @marmbrus --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19314: [SPARK-22094][SS]processAllAvailable should check...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/19314 [SPARK-22094][SS]processAllAvailable should check the query state ## What changes were proposed in this pull request? `processAllAvailable` should also check the query state and if the query is stopped, it should return. ## How was this patch tested? The new unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-22094 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19314.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19314 commit b222aef2179a36a8e2b0c3556ec60b2bfb53e228 Author: Shixiong Zhu <zsxw...@gmail.com> Date: 2017-09-22T00:02:02Z processAllAvailable should check the query state --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19271 LGTM pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140050970 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to find the join key that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConf Hadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +val inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } --- End diff -- The task completion listener may be called without initializing `keyToNumValues` or `keyWithIndexToValue`.
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140055717 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to generate rows that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConfHadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } + + /* + =
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140042806 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -222,16 +222,17 @@ object UnsupportedOperationChecker { joinType match { case _: InnerLike => - if (left.isStreaming && right.isStreaming) { -throwError("Inner join between two streaming DataFrames/Datasets is not supported") + if (left.isStreaming && right.isStreaming && --- End diff -- What if the join is not equality join? UnsupportedOperationChecker should disallow this case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140044239 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala --- @@ -27,9 +27,10 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.{SparkContext, SparkEnv, TaskContext} --- End diff -- nit: `TaskContext` is not used --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19170 cc @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19170 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19212 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19212 LGTM pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18317 I opened https://github.com/sitalkedia/spark/pull/1 in your repo. Could you take a look at it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138502374 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,313 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" + +"value is " + readAheadThresholdInBytes ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); +
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138430322 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138225592 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138201871 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138207519 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138193555 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138225438 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138198506 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuf
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138207456 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138195259 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); --- End diff -- nit: could you add the `bufferSizeInBytes` value to the error message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194732 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138204793 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,292 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean isReadInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean isReadAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private final byte[] oneByte = new byte[1]; + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 && endOfStream) { + return true; +} +return false; + } + + + private void read
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138200321 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,317 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private final byte[] oneByte = new byte[1]; + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean hasRemaining() { +if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 && endOfStream) {
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194101 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138189593 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -72,10 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } +final Double readAheadFraction = +SparkEnv.get() == null ? 0.5 : + SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5); + final InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes); try { - this.in = serializerManager.wrapStream(blockId, bs); + this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs), --- End diff -- Could you add an internal conf to disable it? It will allow the user to disable it when the new feature causes a regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138189733 --- Diff: core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java --- @@ -50,17 +52,16 @@ public void tearDown() { inputFile.delete(); } + --- End diff -- nit: extra empty line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138201264 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138188910 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -72,10 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } +final Double readAheadFraction = --- End diff -- nit: Double -> double --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138195292 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138200201 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194341 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138208239 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAhead
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138198579 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffe
[GitHub] spark issue #19112: [SPARK-21901][SS] Define toString for StateOperatorProgr...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19112 LGTM. Thanks! Merging to master and 2.2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19096: [SPARK-21869][SS] A cached Kafka producer should ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19096#discussion_r137409030 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -43,8 +43,10 @@ private[kafka010] class KafkaWriteTask( * Writes key value data out to topics. */ def execute(iterator: Iterator[InternalRow]): Unit = { -producer = CachedKafkaProducer.getOrCreate(producerConfiguration) +val paramsSeq = CachedKafkaProducer.paramsToSeq(producerConfiguration) while (iterator.hasNext && failedWrite == null) { + // Prevent producer to get expired/evicted from guava cache.(SPARK-21869) + producer = CachedKafkaProducer.getOrCreate(paramsSeq) --- End diff -- This is really hacky. I'm wondering if we can track if a producer is using or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18935 Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18935 Just realized the last build is 18 days ago. Triggered a new now. Will merge after passing tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18935 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18935 LGTM since enableVerboseMetrics is off by default. Merging to master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19093: [SPARK-21880][web UI]In the SQL table page, modify jobs ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19093 Thanks! Merging to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18883: [SPARK-21276][CORE] Update lz4-java to the latest...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18883#discussion_r136451595 --- Diff: project/MimaExcludes.scala --- @@ -41,7 +41,10 @@ object MimaExcludes { // [SPARK-19937] Add remote bytes read to disk. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"), + +// [SPARK-21276] Update lz4-java to the latest (v1.4.0) + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream") --- End diff -- By the way, I'm not sure if we want to pursue strictly compatible. Just to point out the issue here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18883: [SPARK-21276][CORE] Update lz4-java to the latest...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18883#discussion_r136450775 --- Diff: project/MimaExcludes.scala --- @@ -41,7 +41,10 @@ object MimaExcludes { // [SPARK-19937] Add remote bytes read to disk. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"), + +// [SPARK-21276] Update lz4-java to the latest (v1.4.0) + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream") --- End diff -- But the user may write some codes to run different logics according to the InputStream types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18883: [SPARK-21276][CORE] Update lz4-java to the latest...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18883#discussion_r136443138 --- Diff: project/MimaExcludes.scala --- @@ -41,7 +41,10 @@ object MimaExcludes { // [SPARK-19937] Add remote bytes read to disk. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"), + +// [SPARK-21276] Update lz4-java to the latest (v1.4.0) + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream") --- End diff -- @srowen This is a breaking change. We should not remove a public class that is in the api docs: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.io.LZ4BlockInputStream --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19093: [SPARK-21880][web UI]In the SQL table page, modify jobs ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19093 LGTM pending tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19093: [SPARK-21880][web UI]In the SQL table page, modify jobs ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19093 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18928: [SPARK-21696][SS]Fix a potential issue that may generate...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18928 @YuvalItzchakov there are many cases: power outage, the process is killed by oom killer, or the user calls `System.exit`... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18964 Thanks. Merging to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18964 @neoremind that's an interesting project. However, Spark RPC is not designed for high-performance and general RPC. In general, Spark just needs a good enough RPC system. That's why it's using Java serialization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18964 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18964 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18964 > I also notice that Spark RPC by default uses java native serialization, even a verifying endpoint exist or not request would cost 1K of payload size, not to mention some other real logic endpoint, so in the real world it might be useful to profile this @neoremind did you see any performance issue caused by Spark RPC? Spark doesn't send a lot of RPC messages. I don't see it's a bottleneck even when we tried to optimize the latency in Structured Streaming. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18997: [SPARK-21788][SS]Handle more exceptions when stop...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/18997 [SPARK-21788][SS]Handle more exceptions when stopping a streaming query ## What changes were proposed in this pull request? Add more cases we should view as a normal query stop rather than a failure. ## How was this patch tested? The new unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-21788 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18997.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18997 commit 8aa3a2933225313988b50377b26f680151488535 Author: Shixiong Zhu <zsxw...@gmail.com> Date: 2017-08-18T20:45:23Z Handle more exceptions when stopping a streaming query --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18935 Could you bump the netty version to use its new APIs rather than reflection? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18923: [SPARK-21710][StSt] Fix OOM on ConsoleSink with l...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18923#discussion_r15831 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala --- @@ -49,7 +49,7 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging { println("---") // scalastyle:off println data.sparkSession.createDataFrame( - data.sparkSession.sparkContext.parallelize(data.collect()), data.schema) --- End diff -- I think we also need to consume all data to change the internal states in stateful operators. How about this: ```Scala val encoder = data.exprEnc.resolveAndBind( data.logicalPlan.output, data.sparkSession.sessionState.analyzer) val numRowsToFetch = numRowsToShow + 1 val takeResult = data.queryExecution.toRdd.mapPartitions { iter => var numFetched = 0 val v = ArrayBuffer[Row]() while (numFetched < numRowsToFetch && iter.hasNext) { v += encoder.fromRow(iter.next()) numFetched += 1 } // Consume all data to update internal states in stateful operators. while (iter.hasNext) { iter.next() } v.iterator }.collect().toSeq.take(numRowsToFetch) data.sparkSession.createDataFrame( data.sparkSession.sparkContext.parallelize(takeResult), data.schema).show(numRowsToShow, isTruncated) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18944: [SPARK-21732][SQL]Lazily init hive metastore client
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18944 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18944: [SPARK-21732][SQL]Lazily init hive metastore clie...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18944#discussion_r133096415 --- Diff: sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala --- @@ -0,0 +1,57 @@ +/* --- End diff -- This test is in hive-thriftserver is because tests in the hive project use a shared singleton HiveContext and I cannot create a new one in the same project. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18944: [SPARK-21732][SQL]Lazily init hive metastore client
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18944 cc @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18944: [SPARK-21732][SQL]Lazily init hive metastore clie...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/18944 [SPARK-21732][SQL]Lazily init hive metastore client ## What changes were proposed in this pull request? This PR changes the codes to lazily init hive metastore client so that we can create SparkSession without talking to the hive metastore sever. It's pretty helpful when you set a hive metastore server but it's down. You can still start the Spark shell to debug. ## How was this patch tested? The new unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark hive-lazy-init Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18944.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18944 commit 9eb91493aef7d4ce0e0f4cccd5597a4815d6fd58 Author: Shixiong Zhu <shixi...@databricks.com> Date: 2017-08-15T00:17:39Z lazily init hive metastore client --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18928: [SPARK-21696][SS]Fix a potential issue that may generate...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18928 @tdas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18928: [SPARK-21696][SS]Fix a potential issue that may g...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/18928 [SPARK-21696][SS]Fix a potential issue that may generate partial snapshot files ## What changes were proposed in this pull request? Directly writing a snapshot file may generate a partial file. This PR changes it to write to a temp file then rename to the target file. ## How was this patch tested? Jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-21696 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18928.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18928 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18846: [SPARK-21642][CORE] Use FQDN for DRIVER_HOST_ADDRESS ins...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18846 @thide I think DRIVER_HOST_ADDRESS will be used to generate the driver url. Could you check if this line still works after your change? https://github.com/apache/spark/blob/b35660dd0e930f4b484a079d9e2516b0a7dacf1d/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L131 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18890: [SPARK-21596][SS] Ensure places calling HDFSMetadataLog....
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18890 Merged. Could you close the PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18890: [SPARK-21596][SS] Ensure places calling HDFSMetadataLog....
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18890 Thanks! Merging to branch-2.2. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18890: [SPARK-21596][SS] Ensure places calling HDFSMetadataLog....
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18890 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18799: [SPARK-21596][SS]Ensure places calling HDFSMetadataLog.g...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18799 @tdas addressed your comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18799: [SPARK-21596][SS]Ensure places calling HDFSMetada...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18799#discussion_r132067907 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -1314,6 +1314,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val metadataLog = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0 + assert(metadataLog.add(1, Array(FileEntry(s"$scheme:///file2", 200L, 0 --- End diff -- `newSource.getBatch(None, FileStreamSourceOffset(1))` will fail without this because batch `1` doesn't exist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18881: [SPARK-20433][BUILD] Bump jackson from 2.6.5 to 2.6.7.1
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18881 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18881: [SPARK-20433][BUILD] Bump jackson from 2.6.5 to 2.6.7.1
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18881 LGTM pending tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18179: [SPARK-20894][SS] Resolve the checkpoint location in dri...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18179 @markgrover I'm going to close this one. This is just an improvement and not need to go into 2.2.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18179: [SPARK-20894][SS] Resolve the checkpoint location...
Github user zsxwing closed the pull request at: https://github.com/apache/spark/pull/18179 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18799: [SPARK-21596][SS]Ensure places calling HDFSMetada...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18799#discussion_r131793251 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -123,7 +123,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: serialize(metadata, output) return Some(tempPath) } finally { - IOUtils.closeQuietly(output) + output.close() --- End diff -- The output stream may fail to close (e.g., fail to flush the internal buffer), if it happens, we should fail the query rather than ignoring it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18840: [SPARK-21565][SS] Propagate metadata in attribute replac...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18840 Thanks! Merging to master and branch-2.2. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18848: [SPARK-21374][CORE] Fix reading globbed paths from S3 in...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18848 Also merged to branch-2.2. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18846: [SPARK-21642][CORE] Use FQDN for DRIVER_HOST_ADDRESS ins...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/18846 Does this affect how we detect driver/executor connection lost? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org