[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147824694
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
--- End diff --

we should put the map read from snapshot file into `loadedMaps`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147784280
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) {
+  return currentVersionMap.get
+}
+
--- End diff --

nit: extra empty line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147824364
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) {
+  return currentVersionMap.get
+}
+
+
+// Find the most recent map before this version that we can.
+// [SPARK-22305] This must be done iteratively to avoid stack overflow.
+var lastAvailableVersion = version
+var lastAvailableMap: Option[MapType] = None
+while (lastAvailableMap.isEmpty) {
+  lastAvailableVersion -= 1
+
+  if (lastAvailableVersion <= 0) {
+// Use an empty map for versions 0 or less.
+lastAvailableMap = Some(new MapType)
+  } else {
+lastAvailableMap =
+  synchronized { loadedMaps.get(lastAvailableVersion) }
+.orElse(readSnapshotFile(lastAvailableVersion))
   }
-  loadedMaps.put(version, mapFromFile)
-  mapFromFile
 }
+
+// Load all the deltas from the version after the last available one 
up to the target version.
+// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+val resultMap = lastAvailableMap.get
--- End diff --

We should create a new map here to not change the previous map in case the 
maintenance task is using it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....

2017-10-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19611#discussion_r147826631
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-if (version <= 0) return new MapType
-synchronized { loadedMaps.get(version) }.getOrElse {
-  val mapFromFile = readSnapshotFile(version).getOrElse {
-val prevMap = loadMap(version - 1)
-val newMap = new MapType(prevMap)
-updateFromDeltaFile(version, newMap)
-newMap
+
+// Shortcut if the map for this version is already there to avoid a 
redundant put.
+val currentVersionMap =
+  synchronized { loadedMaps.get(version) 
}.orElse(readSnapshotFile(version))
+if (currentVersionMap.isDefined) {
+  return currentVersionMap.get
+}
+
+
+// Find the most recent map before this version that we can.
+// [SPARK-22305] This must be done iteratively to avoid stack overflow.
+var lastAvailableVersion = version
+var lastAvailableMap: Option[MapType] = None
+while (lastAvailableMap.isEmpty) {
+  lastAvailableVersion -= 1
+
+  if (lastAvailableVersion <= 0) {
+// Use an empty map for versions 0 or less.
+lastAvailableMap = Some(new MapType)
+  } else {
+lastAvailableMap =
+  synchronized { loadedMaps.get(lastAvailableVersion) }
+.orElse(readSnapshotFile(lastAvailableVersion))
   }
-  loadedMaps.put(version, mapFromFile)
-  mapFromFile
 }
+
+// Load all the deltas from the version after the last available one 
up to the target version.
+// The last available version is the one with a full snapshot, so it 
doesn't need deltas.
+val resultMap = lastAvailableMap.get
+for (deltaVersion <- lastAvailableVersion + 1 to version) {
+  updateFromDeltaFile(deltaVersion, resultMap)
+}
+
+loadedMaps.put(version, resultMap)
--- End diff --

`loadedMaps.put(version, resultMap)` -> `synchronized { 
loadedMaps.put(version, resultMap) }`

This is a different issue but since you are touching this, it's better to 
fix it as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap...

2017-10-30 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19611
  
I think we can remove the unit test. It's obviously that `loadMap` is not 
recursive and will not cause StackOverflowError.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19469: [SPARK-22243][DStreams]spark.yarn.jars reload from confi...

2017-10-27 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19469
  
Sorry for the delay. I think it's not worth to design a new feature that's 
only for DStream. Instead, I would encourage people to use Structured Streaming 
in the new Spark versions.

I'm okey to just merge this PR and future minor PRs (I don't think it will 
be a lot) for the similar issues. @ChenjunZou could you reopen this PR, please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19581: [SPARK-22366] Support ignoring missing files

2017-10-26 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19581
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19581: [SPARK-22366] Support ignoring missing files

2017-10-26 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19581
  
LGTM pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19435: [MINOR][SS] "keyWithIndexToNumValues" -> "keyWithIndexTo...

2017-10-13 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19435
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19469: [SPARK-22243][DStreams]spark.yarn.jars reload from confi...

2017-10-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19469
  
LGTM. cc @jerryshao to take a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19469: [SPARK-22243][DStreams]spark.yarn.jars reload from confi...

2017-10-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19469
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19465: [SPARK-21988]Implement StreamingRelation.computeS...

2017-10-10 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/19465

[SPARK-21988]Implement StreamingRelation.computeStats to fix explain

## What changes were proposed in this pull request?

Implement StreamingRelation.computeStats to fix explain

## How was this patch tested?

- unit tests: `StreamingRelation.computeStats` and 
`StreamingExecutionRelation.computeStats`.
- regression tests: `explain join with a normal source` and `explain join 
with MemoryStream`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-21988

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19465.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19465






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19465: [SPARK-21988]Implement StreamingRelation.computeStats to...

2017-10-10 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19465
  
cc @joseph-torres 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19461: [SPARK-22230] Swap per-row order in state store restore.

2017-10-09 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19461
  
Discussed offline. We don't need to backport to branch-2.2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19461: [SPARK-22230] Swap per-row order in state store restore.

2017-10-09 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19461
  
Oh, there are some conflicts with 2.2. @joseph-torres could you submit a 
backport PR, please?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19461: [SPARK-22230] Swap per-row order in state store restore.

2017-10-09 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19461
  
Thanks! Merging to master and 2.2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19336: [SPARK-21947][SS] Check and report error when monotonica...

2017-10-06 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19336
  
LGTM. Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19432: [SPARK-22203][SQL]Add job description for file listing S...

2017-10-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19432
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19432: [SPARK-22203][SQL]Add job description for file listing S...

2017-10-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19432
  
cc @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19432: [SPARK-22203][SQL]Add job description for file li...

2017-10-04 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/19432

[SPARK-22203][SQL]Add job description for file listing Spark jobs

## What changes were proposed in this pull request?

The user may be confused about some 1-tasks jobs. We can add a job 
description for these jobs so that the user can figure it out.

## How was this patch tested?

The new unit test.

Before:
https://user-images.githubusercontent.com/1000778/31202567-f78d15c0-a917-11e7-841e-11b8bf8f0032.png;>

After:
https://user-images.githubusercontent.com/1000778/31202576-fc01e356-a917-11e7-9c2b-7bf80b153adb.png;>



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-22203

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19432.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19432


commit aaf41dd02b8f2109a84d36768fdf1802f7817961
Author: Shixiong Zhu <zsxw...@gmail.com>
Date:   2017-10-04T21:58:32Z

Add job description for file listing Spark jobs




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19416: [SPARK-22187][SS] Update unsaferow format for saved stat...

2017-10-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19416
  
LGTM pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19416: [SPARK-22187][SS] Update unsaferow format for sav...

2017-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19416#discussion_r142583033
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithState_StateManager.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BoundReference, CaseWhen, CreateNamedStruct, 
GetStructField, IsNull, Literal, UnsafeRow}
+import org.apache.spark.sql.execution.ObjectOperator
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
+import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
+
+
+class FlatMapGroupsWithState_StateManager(
+stateEncoder: ExpressionEncoder[Any],
+shouldStoreTimestamp: Boolean) extends Serializable {
+
+  val stateSchema = {
+val schema = new StructType().add("groupState", stateEncoder.schema, 
nullable = true)
+if (shouldStoreTimestamp) schema.add("timeoutTimestamp", LongType) 
else schema
+  }
+
+  def getState(store: StateStore, keyRow: UnsafeRow): 
FlatMapGroupsWithState_StateData = {
+val stateRow = store.get(keyRow)
+stateDataForGets.withNew(
+  keyRow, stateRow, getStateObj(stateRow), getTimestamp(stateRow))
+  }
+
+  def putState(store: StateStore, keyRow: UnsafeRow, state: Any, 
timestamp: Long): Unit = {
+val stateRow = getStateRow(state)
+setTimestamp(stateRow, timestamp)
+store.put(keyRow, stateRow)
+  }
+
+  def removeState(store: StateStore, keyRow: UnsafeRow): Unit = {
+store.remove(keyRow)
+  }
+
+  def getAllState(store: StateStore): 
Iterator[FlatMapGroupsWithState_StateData] = {
+val stateDataForGetAllState = FlatMapGroupsWithState_StateData()
+store.getRange(None, None).map { pair =>
+  stateDataForGetAllState.withNew(
+pair.key, pair.value, getStateObjFromRow(pair.value), 
getTimestamp(pair.value))
+}
+  }
+
+  private val stateAttributes: Seq[Attribute] = stateSchema.toAttributes
+
+  // Get the serializer for the state, taking into account whether we need 
to save timestamps
+  private val stateSerializer = {
+val nestedStateExpr = CreateNamedStruct(
+  stateEncoder.namedExpressions.flatMap(e => Seq(Literal(e.name), e)))
+if (shouldStoreTimestamp) {
+  Seq(nestedStateExpr, Literal(GroupStateImpl.NO_TIMESTAMP))
+} else {
+  Seq(nestedStateExpr)
+}
+  }
+
+  // Get the deserializer for the state. Note that this must be done in 
the driver, as
+  // resolving and binding of deserializer expressions to the encoded type 
can be safely done
+  // only in the driver.
+  private val stateDeserializer = {
+val boundRefToNestedState = BoundReference(nestedStateOrdinal, 
stateEncoder.schema, true)
+val deser = stateEncoder.resolveAndBind().deserializer.transformUp {
+  case BoundReference(ordinal, _, _) => 
GetStructField(boundRefToNestedState, ordinal)
+}
+CaseWhen(Seq(IsNull(boundRefToNestedState) -> Literal(null)), 
elseValue = deser).toCodegen()
+  }
+
+  private lazy val nestedStateOrdinal = 0
+  private lazy val timeoutTimestampOrdinal = 1
+
+  // Converters for translating state between rows and Java objects
+  private lazy val getStateObjFromRow = 
ObjectOperator.deserializeRowToObject(
+stateDeserializer, stateAttributes)
+  private lazy val getStateRowFromObj = 
ObjectOperator.serializeObjectToRow(stateSerializer)
+
+  private lazy val stateDataForGets = FlatMapGroupsWithState_StateData()
+
+  /** Returns the state as Java object if defined */

[GitHub] spark issue #16568: [SPARK-18971][Core]Upgrade Netty to 4.0.43.Final

2017-10-02 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/16568
  
@srowen sounds good to me. 2.2.0 was out several months ago and I didn't 
see any issue caused by the new netty version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19314: [SPARK-22094][SS]processAllAvailable should check the qu...

2017-09-21 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19314
  
Thanks! Merging to master and branch-2.2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19314: [SPARK-22094][SS]processAllAvailable should check the qu...

2017-09-21 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19314
  
cc @marmbrus 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19314: [SPARK-22094][SS]processAllAvailable should check...

2017-09-21 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/19314

[SPARK-22094][SS]processAllAvailable should check the query state

## What changes were proposed in this pull request?

`processAllAvailable` should also check the query state and if the query is 
stopped, it should return.

## How was this patch tested?

The new unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-22094

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19314.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19314


commit b222aef2179a36a8e2b0c3556ec60b2bfb53e228
Author: Shixiong Zhu <zsxw...@gmail.com>
Date:   2017-09-22T00:02:02Z

processAllAvailable should check the query state




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-20 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19271
  
LGTM pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r140050970
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.Predicate
+import 
org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, 
StreamingSymmetricHashJoinExec}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+import org.apache.spark.util.NextIterator
+
+/**
+ * Helper class to manage state required by a single side of 
[[StreamingSymmetricHashJoinExec]].
+ * The interface of this class is basically that of a multi-map:
+ * - Get: Returns an iterator of multiple values for given key
+ * - Append: Append a new value to the given key
+ * - Remove Data by predicate: Drop any state using a predicate condition 
on keys or values
+ *
+ * @param joinSide  Defines the join side
+ * @param inputValueAttributes   Attributes of the input row which will be 
stored as value
+ * @param joinKeys  Expressions to find the join key that will be 
used to key the value rows
+ * @param stateInfo   Information about how to retrieve the 
correct version of state
+ * @param storeConf   Configuration for the state store.
+ * @param hadoopConf  Hadoop configuration for reading state data 
from storage
+ *
+ * Internally, the key -> multiple values is stored in two [[StateStore]]s.
+ * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> 
number of values
+ * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between 
(key, index) -> value
+ * - Put:   update count in KeyToNumValuesStore,
+ *  insert new (key, count) -> value in KeyWithIndexToValueStore
+ * - Get:   read count from KeyToNumValuesStore,
+ *  read each of the n values in KeyWithIndexToValueStore
+ * - Remove state by predicate on keys:
+ *  scan all keys in KeyToNumValuesStore to find keys that do 
match the predicate,
+ *  delete from key from KeyToNumValuesStore, delete values in 
KeyWithIndexToValueStore
+ * - Remove state by condition on values:
+ *  scan all [(key, index) -> value] in KeyWithIndexToValueStore 
to find values that match
+ *  the predicate, delete corresponding (key, indexToDelete) from 
KeyWithIndexToValueStore
+ *  by overwriting with the value of (key, maxIndex), and removing 
[(key, maxIndex),
+ *  decrement corresponding num values in KeyToNumValuesStore
+ */
+class SymmetricHashJoinStateManager(
+val joinSide: JoinSide,
+val inputValueAttributes: Seq[Attribute],
+joinKeys: Seq[Expression],
+stateInfo: Option[StatefulOperatorStateInfo],
+storeConf: StateStoreConf,
+hadoopConf: Configuration) extends Logging {
+
+  import SymmetricHashJoinStateManager._
+
+  // Clean up any state store resources if necessary at the end of the task
+  Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => 
abortIfNeeded() } }
--- End diff --

The task completion listener may be called without initializing 
`keyToNumValues` or `keyWithIndexToValue`. 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r140055717
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.Predicate
+import 
org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, 
StreamingSymmetricHashJoinExec}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.NextIterator
+
+/**
+ * Helper class to manage state required by a single side of 
[[StreamingSymmetricHashJoinExec]].
+ * The interface of this class is basically that of a multi-map:
+ * - Get: Returns an iterator of multiple values for given key
+ * - Append: Append a new value to the given key
+ * - Remove Data by predicate: Drop any state using a predicate condition 
on keys or values
+ *
+ * @param joinSide  Defines the join side
+ * @param inputValueAttributes  Attributes of the input row which will be 
stored as value
+ * @param joinKeys  Expressions to generate rows that will be 
used to key the value rows
+ * @param stateInfo Information about how to retrieve the 
correct version of state
+ * @param storeConf Configuration for the state store.
+ * @param hadoopConfHadoop configuration for reading state 
data from storage
+ *
+ * Internally, the key -> multiple values is stored in two [[StateStore]]s.
+ * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> 
number of values
+ * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between 
(key, index) -> value
+ * - Put:   update count in KeyToNumValuesStore,
+ *  insert new (key, count) -> value in KeyWithIndexToValueStore
+ * - Get:   read count from KeyToNumValuesStore,
+ *  read each of the n values in KeyWithIndexToValueStore
+ * - Remove state by predicate on keys:
+ *  scan all keys in KeyToNumValuesStore to find keys that do 
match the predicate,
+ *  delete from key from KeyToNumValuesStore, delete values in 
KeyWithIndexToValueStore
+ * - Remove state by condition on values:
+ *  scan all [(key, index) -> value] in KeyWithIndexToValueStore 
to find values that match
+ *  the predicate, delete corresponding (key, indexToDelete) from 
KeyWithIndexToValueStore
+ *  by overwriting with the value of (key, maxIndex), and removing 
[(key, maxIndex),
+ *  decrement corresponding num values in KeyToNumValuesStore
+ */
+class SymmetricHashJoinStateManager(
+val joinSide: JoinSide,
+inputValueAttributes: Seq[Attribute],
+joinKeys: Seq[Expression],
+stateInfo: Option[StatefulOperatorStateInfo],
+storeConf: StateStoreConf,
+hadoopConf: Configuration) extends Logging {
+
+  import SymmetricHashJoinStateManager._
+
+  // Clean up any state store resources if necessary at the end of the task
+  Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => 
abortIfNeeded() } }
+
+  /*
+  =

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r140042806
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -222,16 +222,17 @@ object UnsupportedOperationChecker {
   joinType match {
 
 case _: InnerLike =>
-  if (left.isStreaming && right.isStreaming) {
-throwError("Inner join between two streaming 
DataFrames/Datasets is not supported")
+  if (left.isStreaming && right.isStreaming &&
--- End diff --

What if the join is not equality join? UnsupportedOperationChecker should 
disallow this case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-20 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r140044239
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
@@ -27,9 +27,10 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.{SparkContext, SparkEnv, TaskContext}
--- End diff --

nit: `TaskContext` is not used


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...

2017-09-19 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19170
  
cc @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...

2017-09-19 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19170
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...

2017-09-14 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19212
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...

2017-09-13 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19212
  
LGTM pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-12 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18317
  
I opened https://github.com/sitalkedia/spark/pull/1 in your repo. Could you 
take a look at it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138502374
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,313 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0, but the value is 
" + bufferSizeInBytes);
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes, but the" +
+"value is " + readAheadThresholdInBytes );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+ 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138430322
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138225592
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138201871
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138207519
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138193555
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138225438
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138198506
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuf

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138207456
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138195259
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
--- End diff --

nit: could you add the `bufferSizeInBytes` value to the error message?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194732
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138204793
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,292 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean isReadInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean isReadAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 
&& endOfStream) {
+  return true;
+}
+return  false;
+  }
+
+
+  private void read

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138200321
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,317 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean hasRemaining() {
+if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 
&& endOfStream) {
  

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194101
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138189593
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 ---
@@ -72,10 +72,15 @@ public UnsafeSorterSpillReader(
   bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
 }
 
+final Double readAheadFraction =
+SparkEnv.get() == null ? 0.5 :
+ 
SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction",
 0.5);
+
 final InputStream bs =
 new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
 try {
-  this.in = serializerManager.wrapStream(blockId, bs);
+  this.in = new 
ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
--- End diff --

Could you add an internal conf to disable it? It will allow the user to 
disable it when the new feature causes a regression.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138189733
  
--- Diff: 
core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java ---
@@ -50,17 +52,16 @@ public void tearDown() {
 inputFile.delete();
   }
 
+
--- End diff --

nit: extra empty line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138201264
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138188910
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 ---
@@ -72,10 +72,15 @@ public UnsafeSorterSpillReader(
   bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
 }
 
+final Double readAheadFraction =
--- End diff --

nit: Double -> double


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138195292
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138200201
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194341
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138208239
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAhead

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138198579
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffe

[GitHub] spark issue #19112: [SPARK-21901][SS] Define toString for StateOperatorProgr...

2017-09-06 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19112
  
LGTM. Thanks! Merging to master and 2.2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19096: [SPARK-21869][SS] A cached Kafka producer should ...

2017-09-06 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/19096#discussion_r137409030
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -43,8 +43,10 @@ private[kafka010] class KafkaWriteTask(
* Writes key value data out to topics.
*/
   def execute(iterator: Iterator[InternalRow]): Unit = {
-producer = CachedKafkaProducer.getOrCreate(producerConfiguration)
+val paramsSeq = CachedKafkaProducer.paramsToSeq(producerConfiguration)
 while (iterator.hasNext && failedWrite == null) {
+  // Prevent producer to get expired/evicted from guava 
cache.(SPARK-21869)
+  producer = CachedKafkaProducer.getOrCreate(paramsSeq)
--- End diff --

This is really hacky. I'm wondering if we can track if a producer is using 
or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark

2017-09-05 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18935
  
Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark

2017-09-05 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18935
  
Just realized the last build is 18 days ago. Triggered a new now. Will 
merge after passing tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark

2017-09-05 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18935
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark

2017-09-05 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18935
  
LGTM since enableVerboseMetrics is off by default. Merging to master. 
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19093: [SPARK-21880][web UI]In the SQL table page, modify jobs ...

2017-09-01 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19093
  
Thanks! Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18883: [SPARK-21276][CORE] Update lz4-java to the latest...

2017-08-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18883#discussion_r136451595
  
--- Diff: project/MimaExcludes.scala ---
@@ -41,7 +41,10 @@ object MimaExcludes {
 
 // [SPARK-19937] Add remote bytes read to disk.
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this")
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"),
+
+// [SPARK-21276] Update lz4-java to the latest (v1.4.0)
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream")
--- End diff --

By the way, I'm not sure if we want to pursue strictly compatible. Just to 
point out the issue here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18883: [SPARK-21276][CORE] Update lz4-java to the latest...

2017-08-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18883#discussion_r136450775
  
--- Diff: project/MimaExcludes.scala ---
@@ -41,7 +41,10 @@ object MimaExcludes {
 
 // [SPARK-19937] Add remote bytes read to disk.
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this")
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"),
+
+// [SPARK-21276] Update lz4-java to the latest (v1.4.0)
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream")
--- End diff --

But the user may write some codes to run different logics according to the 
InputStream types. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18883: [SPARK-21276][CORE] Update lz4-java to the latest...

2017-08-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18883#discussion_r136443138
  
--- Diff: project/MimaExcludes.scala ---
@@ -41,7 +41,10 @@ object MimaExcludes {
 
 // [SPARK-19937] Add remote bytes read to disk.
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"),
-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this")
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"),
+
+// [SPARK-21276] Update lz4-java to the latest (v1.4.0)
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream")
--- End diff --

@srowen This is a breaking change. We should not remove a public class that 
is in the api docs: 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.io.LZ4BlockInputStream


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19093: [SPARK-21880][web UI]In the SQL table page, modify jobs ...

2017-08-31 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19093
  
LGTM pending tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19093: [SPARK-21880][web UI]In the SQL table page, modify jobs ...

2017-08-31 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19093
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18928: [SPARK-21696][SS]Fix a potential issue that may generate...

2017-08-27 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18928
  
@YuvalItzchakov there are many cases: power outage, the process is killed 
by oom killer, or the user calls `System.exit`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...

2017-08-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18964
  
Thanks. Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...

2017-08-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18964
  
@neoremind that's an interesting project. However, Spark RPC is not 
designed for high-performance and general RPC. In general, Spark just needs a 
good enough RPC system. That's why it's using Java serialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...

2017-08-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18964
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...

2017-08-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18964
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18964: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF...

2017-08-23 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18964
  
> I also notice that Spark RPC by default uses java native serialization, 
even a verifying endpoint exist or not request would cost 1K of payload size, 
not to mention some other real logic endpoint, so in the real world it might be 
useful to profile this

@neoremind did you see any performance issue caused by Spark RPC? Spark 
doesn't send a lot of RPC messages. I don't see it's a bottleneck even when we 
tried to optimize the latency in Structured Streaming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18997: [SPARK-21788][SS]Handle more exceptions when stop...

2017-08-18 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/18997

[SPARK-21788][SS]Handle more exceptions when stopping a streaming query

## What changes were proposed in this pull request?

Add more cases we should view as a normal query stop rather than a failure.

## How was this patch tested?

The new unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-21788

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18997


commit 8aa3a2933225313988b50377b26f680151488535
Author: Shixiong Zhu <zsxw...@gmail.com>
Date:   2017-08-18T20:45:23Z

Handle more exceptions when stopping a streaming query




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark

2017-08-16 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18935
  
Could you bump the netty version to use its new APIs rather than reflection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18923: [SPARK-21710][StSt] Fix OOM on ConsoleSink with l...

2017-08-15 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18923#discussion_r15831
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala 
---
@@ -49,7 +49,7 @@ class ConsoleSink(options: Map[String, String]) extends 
Sink with Logging {
 println("---")
 // scalastyle:off println
 data.sparkSession.createDataFrame(
-  data.sparkSession.sparkContext.parallelize(data.collect()), 
data.schema)
--- End diff --

I think we also need to consume all data to change the internal states in 
stateful operators. How about this:
```Scala
val encoder = data.exprEnc.resolveAndBind(
  data.logicalPlan.output,
  data.sparkSession.sessionState.analyzer)

val numRowsToFetch = numRowsToShow + 1
val takeResult = data.queryExecution.toRdd.mapPartitions { iter =>
  var numFetched = 0
  val v = ArrayBuffer[Row]()
  while (numFetched < numRowsToFetch && iter.hasNext) {
v += encoder.fromRow(iter.next())
numFetched += 1
  }
  // Consume all data to update internal states in stateful operators.
  while (iter.hasNext) {
iter.next()
  }
  v.iterator
}.collect().toSeq.take(numRowsToFetch)

data.sparkSession.createDataFrame(
  data.sparkSession.sparkContext.parallelize(takeResult),
  data.schema).show(numRowsToShow, isTruncated)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18944: [SPARK-21732][SQL]Lazily init hive metastore client

2017-08-14 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18944
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18944: [SPARK-21732][SQL]Lazily init hive metastore clie...

2017-08-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18944#discussion_r133096415
  
--- Diff: 
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala
 ---
@@ -0,0 +1,57 @@
+/*
--- End diff --

This test is in hive-thriftserver is because tests in the hive project use 
a shared singleton HiveContext and I cannot create a new one in the same 
project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18944: [SPARK-21732][SQL]Lazily init hive metastore client

2017-08-14 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18944
  
cc @yhuai 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18944: [SPARK-21732][SQL]Lazily init hive metastore clie...

2017-08-14 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/18944

[SPARK-21732][SQL]Lazily init hive metastore client

## What changes were proposed in this pull request?

This PR changes the codes to lazily init hive metastore client so that we 
can create SparkSession without talking to the hive metastore sever.

It's pretty helpful when you set a hive metastore server but it's down. You 
can still start the Spark shell to debug.

## How was this patch tested?

The new unit test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark hive-lazy-init

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18944


commit 9eb91493aef7d4ce0e0f4cccd5597a4815d6fd58
Author: Shixiong Zhu <shixi...@databricks.com>
Date:   2017-08-15T00:17:39Z

lazily init hive metastore client




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18928: [SPARK-21696][SS]Fix a potential issue that may generate...

2017-08-11 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18928
  
@tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18928: [SPARK-21696][SS]Fix a potential issue that may g...

2017-08-11 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/18928

[SPARK-21696][SS]Fix a potential issue that may generate partial snapshot 
files

## What changes were proposed in this pull request?

Directly writing a snapshot file may generate a partial file. This PR 
changes it to write to a temp file then rename to the target file.

## How was this patch tested?

Jenkins.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-21696

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18928.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18928






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18846: [SPARK-21642][CORE] Use FQDN for DRIVER_HOST_ADDRESS ins...

2017-08-09 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18846
  
@thide I think DRIVER_HOST_ADDRESS will be used to generate the driver 
url. Could you check if this line still works after your change? 
https://github.com/apache/spark/blob/b35660dd0e930f4b484a079d9e2516b0a7dacf1d/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L131


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18890: [SPARK-21596][SS] Ensure places calling HDFSMetadataLog....

2017-08-09 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18890
  
Merged. Could you close the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18890: [SPARK-21596][SS] Ensure places calling HDFSMetadataLog....

2017-08-09 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18890
  
Thanks! Merging to branch-2.2.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18890: [SPARK-21596][SS] Ensure places calling HDFSMetadataLog....

2017-08-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18890
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18799: [SPARK-21596][SS]Ensure places calling HDFSMetadataLog.g...

2017-08-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18799
  
@tdas addressed your comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18799: [SPARK-21596][SS]Ensure places calling HDFSMetada...

2017-08-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18799#discussion_r132067907
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -1314,6 +1314,7 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 val metadataLog =
   new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, 
dir.getAbsolutePath)
   assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 
0
+  assert(metadataLog.add(1, Array(FileEntry(s"$scheme:///file2", 200L, 
0
--- End diff --

`newSource.getBatch(None, FileStreamSourceOffset(1))` will fail without 
this because batch `1` doesn't exist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18881: [SPARK-20433][BUILD] Bump jackson from 2.6.5 to 2.6.7.1

2017-08-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18881
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18881: [SPARK-20433][BUILD] Bump jackson from 2.6.5 to 2.6.7.1

2017-08-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18881
  
LGTM pending tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18179: [SPARK-20894][SS] Resolve the checkpoint location in dri...

2017-08-07 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18179
  
@markgrover I'm going to close this one. This is just an improvement and 
not need to go into 2.2.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18179: [SPARK-20894][SS] Resolve the checkpoint location...

2017-08-07 Thread zsxwing
Github user zsxwing closed the pull request at:

https://github.com/apache/spark/pull/18179


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18799: [SPARK-21596][SS]Ensure places calling HDFSMetada...

2017-08-07 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18799#discussion_r131793251
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -123,7 +123,7 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   serialize(metadata, output)
   return Some(tempPath)
 } finally {
-  IOUtils.closeQuietly(output)
+  output.close()
--- End diff --

The output stream may fail to close (e.g., fail to flush the internal 
buffer), if it happens, we should fail the query rather than ignoring it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18840: [SPARK-21565][SS] Propagate metadata in attribute replac...

2017-08-07 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18840
  
Thanks! Merging to master and branch-2.2.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18848: [SPARK-21374][CORE] Fix reading globbed paths from S3 in...

2017-08-07 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18848
  
Also merged to branch-2.2.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18846: [SPARK-21642][CORE] Use FQDN for DRIVER_HOST_ADDRESS ins...

2017-08-07 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/18846
  
Does this affect how we detect driver/executor connection lost?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    2   3   4   5   6   7   8   9   10   11   >