[GitHub] spark pull request: [SPARK-14620][SQL] Use/benchmark a better hash...

2016-04-15 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12379#issuecomment-210667378
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13745][SQL]Support columnar in memory r...

2016-04-15 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12397#issuecomment-210600346
  
Agreed. LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14620][SQL] Use/benchmark a better hash...

2016-04-15 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12379#discussion_r59921018
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
 ---
@@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite 
{
 Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 Aggregate w keys:   Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative
 
---
-codegen = F  2219 / 2392  9.4  
   105.8   1.0X
-codegen = T hashmap = F  1330 / 1466 15.8  
63.4   1.7X
-codegen = T hashmap = T   384 /  518 54.7  
18.3   5.8X
+codegen = F  2323 / 2567  9.0  
   110.8   1.0X
+codegen = T hashmap = F  1182 / 1246 17.7  
56.4   2.0X
+codegen = T hashmap = T   381 /  489 55.0  
18.2   6.1X
+*/
+  }
+
+  ignore("aggregate with randomized keys") {
+val N = 20 << 20
+
+val benchmark = new Benchmark("Aggregate w keys", N)
+sqlContext.range(N).selectExpr("id", "floor(rand() * 1) as 
k").registerTempTable("test")
+
+def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group 
by k, k").collect()
--- End diff --

I feel you're spending a lot of time in rand


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14620][SQL] Use/benchmark a better hash...

2016-04-15 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12379#discussion_r59904543
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
 ---
@@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite 
{
 Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 Aggregate w keys:   Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative
 
---
-codegen = F  2219 / 2392  9.4  
   105.8   1.0X
-codegen = T hashmap = F  1330 / 1466 15.8  
63.4   1.7X
-codegen = T hashmap = T   384 /  518 54.7  
18.3   5.8X
+codegen = F  2323 / 2567  9.0  
   110.8   1.0X
+codegen = T hashmap = F  1182 / 1246 17.7  
56.4   2.0X
+codegen = T hashmap = T   381 /  489 55.0  
18.2   6.1X
+*/
+  }
+
+  ignore("aggregate with randomized keys") {
+val N = 20 << 20
+
+val benchmark = new Benchmark("Aggregate w keys", N)
+sqlContext.range(N).selectExpr("id", "floor(rand() * 1) as 
k").registerTempTable("test")
+
+def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group 
by k, k").collect()
--- End diff --

what does this plan look like?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-14 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12345#issuecomment-210249105
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-14 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59814608
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 ---
@@ -261,7 +263,12 @@ case class TungstenAggregate(
 .map(_.asInstanceOf[DeclarativeAggregate])
   private val bufferSchema = 
StructType.fromAttributes(aggregateBufferAttributes)
 
-  // The name for HashMap
+  // The name for Vectorized HashMap
+  private var vectorizedHashMapTerm: String = _
+  private var isVectorizedHashMapEnabled: Boolean = 
sqlContext.conf.columnarAggregateMapEnabled &&
+(modes.contains(Partial) || modes.contains(PartialMerge))
--- End diff --

I don't think this is the correct logic. I think modes.forall( is Partial) 
is what we want.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-14 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59814567
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 ---
@@ -437,17 +444,21 @@ case class TungstenAggregate(
 val initAgg = ctx.freshName("initAgg")
 ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
 
-// create AggregateHashMap
-val isAggregateHashMapEnabled: Boolean = false
-val isAggregateHashMapSupported: Boolean =
+// We currently only enable vectorized hashmap for long key/value types
+isVectorizedHashMapEnabled = isVectorizedHashMapEnabled &&
--- End diff --

I think it's weird that you have the check split between here and line 268. 
maybe combine all these checks to 268 and make isVecdtorizedEnabled a val.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13745][SQL]Support columnar in memory r...

2016-04-14 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12397#discussion_r59789608
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -31,6 +33,8 @@
   private byte[] buffer;
   private int offset;
   private int bitOffset; // Only used for booleans.
+  
+  private final static boolean bigEndianPlatform = 
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
--- End diff --

I'm wondering if there is a difference between wrapping the buffer with a 
ByteBuffer once (in InitFromPage) and if that behaves any differently than 
reversing for each integer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13745][SQL]Support columnar in memory r...

2016-04-14 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12397#discussion_r59767603
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -31,6 +33,8 @@
   private byte[] buffer;
   private int offset;
   private int bitOffset; // Only used for booleans.
+  
+  private final static boolean bigEndianPlatform = 
ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
--- End diff --

How does this implementation compare to ByteBuffer.wrap(buffer).order(...)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14217][SQL] Fix bug if parquet data has...

2016-04-14 Thread nongli
Github user nongli closed the pull request at:

https://github.com/apache/spark/pull/12017


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59650089
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 ---
@@ -533,56 +578,104 @@ case class TungstenAggregate(
 
 val inputAttr = aggregateBufferAttributes ++ child.output
 ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
+
+ctx.INPUT_ROW = aggregateRow
+// TODO: support subexpression elimination
+val aggregateRowEvals = updateExpr.map(BindReferences.bindReference(_, 
inputAttr).gen(ctx))
--- End diff --

This code needs some more comments to unerstand the high level parts. It's 
not obviuos why you do this twice, once with ctx.INPUT_ROW = aggregateRow and 
once again with buffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59649964
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala
 ---
@@ -21,10 +21,10 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.types.StructType
 
 /**
- * This is a helper object to generate an append-only single-key/single 
value aggregate hash
- * map that can act as a 'cache' for extremely fast key-value lookups 
while evaluating aggregates
- * (and fall back to the `BytesToBytesMap` if a given key isn't found). 
This is 'codegened' in
- * TungstenAggregate to speed up aggregates w/ key.
+ * This is a helper class to generate an append-only aggregate hash map 
that can act as a 'cache'
+ * for extremely fast key-value lookups while evaluating aggregates (and 
fall back to the
+ * `BytesToBytesMap` if a given key isn't found). This is 'codegened' in 
TungstenAggregate to speed
+ * up aggregates w/ key.
--- End diff --

You chose to have this not handle null keys right? Comment that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59649860
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 ---
@@ -533,56 +578,104 @@ case class TungstenAggregate(
 
 val inputAttr = aggregateBufferAttributes ++ child.output
 ctx.currentVars = new 
Array[ExprCode](aggregateBufferAttributes.length) ++ input
+
+ctx.INPUT_ROW = aggregateRow
+// TODO: support subexpression elimination
--- End diff --

let's remove this todo in both places, not sure what it means anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59649827
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 ---
@@ -513,8 +555,11 @@ case class TungstenAggregate(
 ctx.currentVars = input
 val keyCode = GenerateUnsafeProjection.createCode(
--- End diff --

I think we can do better with the variable names to improve readability.

keyCode, groupByKeys, key all sound too similar. Add row or unsafe row to 
the keys that have been collected to an unsafe row or differentiate them 
somehow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59649610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 ---
@@ -484,13 +497,42 @@ case class TungstenAggregate(
 // so `copyResult` should be reset to `false`.
 ctx.copyResult = false
 
+def outputFromGeneratedMap: Option[String] = {
+  if (isAggregateHashMapEnabled) {
+val row = ctx.freshName("aggregateHashMapRow")
--- End diff --

Comment at a high level what this is doing. Something like. " Iterate over 
the aggregate rows and convert them from ColumnarBatch.Row to UnsafeRow"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59649417
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala
 ---
@@ -65,27 +69,43 @@ class ColumnarAggMapCodeGenerator(
   .mkString("\n")};
   """.stripMargin
 
+val generatedAggBufferSchema: String =
+  s"""
+ |new org.apache.spark.sql.types.StructType()
+ |${bufferSchema.map(key =>
+s""".add("${key.name}", 
org.apache.spark.sql.types.DataTypes.${key.dataType})""")
+.mkString("\n")};
+  """.stripMargin
+
 s"""
|  private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
batch;
+   |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
aggregateBufferBatch;
|  private int[] buckets;
|  private int numBuckets;
|  private int maxSteps;
|  private int numRows = 0;
|  private org.apache.spark.sql.types.StructType schema = 
$generatedSchema
+   |  private org.apache.spark.sql.types.StructType 
aggregateBufferSchema =
+   |$generatedAggBufferSchema
|
-   |  public $generatedClassName(int capacity, double loadFactor, int 
maxSteps) {
-   |assert (capacity > 0 && ((capacity & (capacity - 1)) == 0));
-   |this.maxSteps = maxSteps;
-   |numBuckets = (int) (capacity / loadFactor);
+   |  public $generatedClassName() {
+   |// TODO: These should be generated based on the schema
+   |int DEFAULT_CAPACITY = 1 << 16;
+   |double DEFAULT_LOAD_FACTOR = 0.25;
+   |int DEFAULT_MAX_STEPS = 2;
+   |assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & 
(DEFAULT_CAPACITY - 1)) == 0));
+   |this.maxSteps = DEFAULT_MAX_STEPS;
+   |numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR);
|batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
-   |  org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
+   |  org.apache.spark.memory.MemoryMode.ON_HEAP, 
DEFAULT_CAPACITY);
+   |aggregateBufferBatch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
+   |  aggregateBufferSchema, 
org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
--- End diff --

Let's leave a TODO to fix this. There should be a nicer way to get a 
projection of a batch instead of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12345#issuecomment-209695875
  
@sameeragarwal have you updated the generated code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12345#issuecomment-209290930
  
I don't think this works if we have NULL keys. This is kind of annoying, 
let's think about that tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12345#issuecomment-209290367
  

/* 120 */   batch.column(1).putLong(numRows, 0);
I don't think this is right. You should initialize the agg exprs to NULL


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59506077
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala
 ---
@@ -65,27 +65,43 @@ class ColumnarAggMapCodeGenerator(
   .mkString("\n")};
   """.stripMargin
 
+val generatedAggBufferSchema: String =
+  s"""
+ |new org.apache.spark.sql.types.StructType()
+ |${bufferSchema.map(key =>
+s""".add("${key.name}", 
org.apache.spark.sql.types.DataTypes.${key.dataType})""")
+.mkString("\n")};
+  """.stripMargin
+
 s"""
-   |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
batch;
+   |  public org.apache.spark.sql.execution.vectorized.ColumnarBatch 
batch;
--- End diff --

Can we not make either of these public. Also, have a comment that explains 
why you have two batches their relation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59505993
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
 ---
@@ -153,16 +153,36 @@ class BenchmarkWholeStageCodegen extends 
SparkFunSuite {
   ignore("aggregate with keys") {
 val N = 20 << 20
 
-runBenchmark("Aggregate w keys", N) {
-  sqlContext.range(N).selectExpr("(id & 65535) as 
k").groupBy("k").sum().collect()
+val benchmark = new Benchmark("Aggregate w keys", N)
--- End diff --

Do we have tests that exercise the correctness here? It should group by a 
varying number of keys to test the paths you addd.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...

2016-04-13 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12345#discussion_r59505821
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala
 ---
@@ -103,7 +119,7 @@ class ColumnarAggMapCodeGenerator(
 s"""
|// TODO: Improve this hash function
|private long hash($groupingKeySignature) {
-   |  return ${groupingKeys.map(_._2).mkString(" ^ ")};
+   |  return ${groupingKeys.map(_._2).mkString(" | ")};
--- End diff --

Any reason not do implmeent the h = h * 37 + v hash function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14547] Avoid DNS resolution for reusing...

2016-04-12 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12315#discussion_r59409060
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 ---
@@ -149,25 +148,31 @@ public TransportClient createClient(String 
remoteHost, int remotePort) throws IO
   }
 
   if (cachedClient.isActive()) {
-logger.trace("Returning cached connection to {}: {}", address, 
cachedClient);
+logger.trace("Returning cached connection to {}: {}",
+  cachedClient.getSocketAddress(), cachedClient);
 return cachedClient;
   }
 }
 
 // If we reach here, we don't have an existing connection open. Let's 
create a new one.
 // Multiple threads might race here to create new connections. Keep 
only one of them active.
+final long preResolveHost = System.nanoTime();
+final InetSocketAddress resolvedAddress = new 
InetSocketAddress(remoteHost, remotePort);
+final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 
100;
+logger.trace("Spent {} ms to resolve {}", hostResolveTimeMs, 
resolvedAddress);
--- End diff --

I think it's more useful to log a warning if this time is greater than say 
2 seconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14547] Avoid DNS resolution for reusing...

2016-04-12 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12315#issuecomment-208998610
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14520][SQL] Use correct return type in ...

2016-04-11 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12292#issuecomment-208524105
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14482][SQL] Change default Parquet code...

2016-04-08 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12256#issuecomment-207523676
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14467][SQL] Interleave CPU and IO bette...

2016-04-08 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12243#discussion_r59044639
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 ---
@@ -46,37 +50,80 @@ case class PartitionedFile(
  */
 case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends 
Partition
 
+object FileScanRDD {
+  private val ioExecutionContext = ExecutionContext.fromExecutorService(
+ThreadUtils.newDaemonCachedThreadPool("FileScanRDD", 16))
--- End diff --

It's difficult to model this as the total number of cores because what this 
is intended to do is background IO and use very little CPU. The async io will 
still use some CPU resources but expected to be very low, a small fraction of a 
core. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...

2016-04-08 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12161#discussion_r59043603
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.types.StructType
+
+class TungstenAggregateHashMap(
+ctx: CodegenContext,
+generatedClassName: String,
+groupingKeySchema: StructType,
+bufferSchema: StructType) {
+  val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, 
ctx.freshName("key")))
+  val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), 
key.dataType.typeName))
+  val groupingKeySignature = 
groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ")
+
+  def generate(): String = {
+s"""
+   |public class $generatedClassName {
+   |${initializeAggregateHashMap()}
+   |
+   |${generateFindOrInsert()}
+   |
+   |${generateEquals()}
+   |
+   |${generateHashFunction()}
+   |}
+ """.stripMargin
+  }
+
+  def initializeAggregateHashMap(): String = {
+val generatedSchema: String =
--- End diff --

that was my initial thought too but this generated class only works for one 
schema due to the specialized equals/hash/find signatures. It's not 
particularly useful to pass in a schema if only one works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...

2016-04-07 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12161#issuecomment-207146943
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14467][SQL] Interleave CPU and IO bette...

2016-04-07 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12243#issuecomment-207136072
  
@holdenk I tried to simplify the logic. Let me know your thoughts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...

2016-04-07 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12161#issuecomment-207134229
  
I think the old version makes more sense. The generated code only works for 
a particular schema so no reason to pass it in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14467][SQL] Interleave CPU and IO bette...

2016-04-07 Thread nongli
GitHub user nongli opened a pull request:

https://github.com/apache/spark/pull/12243

[SPARK-14467][SQL] Interleave CPU and IO better in FileScanRDD.

## What changes were proposed in this pull request?

This patch updates FileScanRDD to start reading from the next file while 
the current file
is being processed. The goal is to have better interleaving of CPU and IO. 
It does this
by launching a future which will asynchronously start preparing the next 
file to be read.
The expectation is that the async task is IO intensive and the current file 
(which
includes all the computation for the query plan) is CPU intensive. For some 
file formats,
this would just mean opening the file and the initial setup. For file 
formats like
parquet, this would mean doing all the IO for all the columns.

## How was this patch tested?

Good coverage from existing tests. Added a new one to test the flag. 
Cluster testing on tpcds queries.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nongli/spark interleave

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/12243.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #12243


commit cc6d98a17f6fa4249951802f981c2224d354e651
Author: Nong Li <n...@databricks.com>
Date:   2016-04-05T20:36:34Z

[SPARK-14467][SQL] Interleave CPU and IO better in FileScanRDD.

This patch updates FileScanRDD to start reading from the next file while 
the current file
is being processed. The goal is to have better interleaving of CPU and IO. 
It does this
by launching a future which will asynchronously start preparing the next 
file to be read.
The expectation is that the async task is IO intensive and the current file 
(which
includes all the computation for the query plan) is CPU intensive. For some 
file formats,
this would just mean opening the file and the initial setup. For file 
formats like
parquet, this would mean doing all the IO for all the columns.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14369][SQL] Locality support for FileSc...

2016-04-07 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12153#discussion_r58917033
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -137,10 +140,14 @@ private[sql] object FileSourceStrategy extends 
Strategy with Logging {
 
   val splitFiles = selectedPartitions.flatMap { partition =>
 partition.files.flatMap { file =>
+  val blockLocations = getBlockLocations(file)
   (0L to file.getLen by maxSplitBytes).map { offset =>
 val remaining = file.getLen - offset
 val size = if (remaining > maxSplitBytes) maxSplitBytes 
else remaining
-PartitionedFile(partition.values, 
file.getPath.toUri.toString, offset, size)
+// Finds out a list of location hosts where lives the 
first block that contains the
+// starting point the file block starts at `offset` with 
length `size`.
+val hosts = getBlockHosts(blockLocations, offset)
+PartitionedFile(partition.values, 
file.getPath.toUri.toString, offset, size, hosts)
--- End diff --

This is not likely to generate good locality right?

Is it reasonable to repurpose this?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala#L124


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14369][SQL] Locality support for FileSc...

2016-04-07 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12153#discussion_r58906143
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 ---
@@ -137,10 +140,14 @@ private[sql] object FileSourceStrategy extends 
Strategy with Logging {
 
   val splitFiles = selectedPartitions.flatMap { partition =>
 partition.files.flatMap { file =>
+  val blockLocations = getBlockLocations(file)
   (0L to file.getLen by maxSplitBytes).map { offset =>
 val remaining = file.getLen - offset
 val size = if (remaining > maxSplitBytes) maxSplitBytes 
else remaining
-PartitionedFile(partition.values, 
file.getPath.toUri.toString, offset, size)
+// Finds out a list of location hosts where lives the 
first block that contains the
+// starting point the file block starts at `offset` with 
length `size`.
+val hosts = getBlockHosts(blockLocations, offset)
+PartitionedFile(partition.values, 
file.getPath.toUri.toString, offset, size, hosts)
--- End diff --

How is this locality information maintained when we coalesce in the logic 
below?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...

2016-04-07 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12161#issuecomment-206949611
  
The generated code takes a schema in the ctor and creates one as a member 
var. Let's just use the member var one like you had originally. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL...

2016-04-06 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12047#issuecomment-206590169
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL...

2016-04-06 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12047#discussion_r58789868
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala 
---
@@ -434,12 +435,20 @@ case class CollapseCodegenStages(conf: SQLConf) 
extends Rule[SparkPlan] {
 case _ => true
   }
 
+  private def numOfNestedFields(dataType: DataType): Int = dataType match {
+case dt: StructType => dt.fields.map(f => 
numOfNestedFields(f.dataType)).sum
+case m: MapType => numOfNestedFields(m.keyType) + 
numOfNestedFields(m.valueType)
+case a: ArrayType => numOfNestedFields(a.elementType) + 1
--- End diff --

why + 1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL...

2016-04-06 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12047#discussion_r58789578
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -121,6 +109,7 @@ public void initialize(InputSplit inputSplit, 
TaskAttemptContext taskAttemptCont
   throws IOException, InterruptedException, 
UnsupportedOperationException {
 super.initialize(inputSplit, taskAttemptContext);
 initializeInternal();
+Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
--- End diff --

what does this do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...

2016-04-06 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12161#discussion_r58788879
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.types.StructType
+
+class TungstenAggregateHashMap(
+ctx: CodegenContext,
+generatedClassName: String,
+groupingKeySchema: StructType,
+bufferSchema: StructType) {
+  val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, 
ctx.freshName("key")))
+  val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), 
key.dataType.typeName))
+  val groupingKeySignature = 
groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ")
+
+  def generate(): String = {
+s"""
+   |public class $generatedClassName {
+   |${initializeAggregateHashMap()}
+   |
+   |${generateFindOrInsert()}
+   |
+   |${generateEquals()}
+   |
+   |${generateHashFunction()}
+   |}
+ """.stripMargin
+  }
+
+  def initializeAggregateHashMap(): String = {
+val generatedSchema: String =
--- End diff --

I don't htink this should be generated. I think the generated ctor should 
take a schema and we should get that from the non-generated code if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...

2016-04-06 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12161#discussion_r58788712
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.aggregate
+
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.types.StructType
+
+class TungstenAggregateHashMap(
+ctx: CodegenContext,
+generatedClassName: String,
+groupingKeySchema: StructType,
+bufferSchema: StructType) {
+  val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, 
ctx.freshName("key")))
+  val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), 
key.dataType.typeName))
+  val groupingKeySignature = 
groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ")
+
+  def generate(): String = {
+s"""
+   |public class $generatedClassName {
+   |${initializeAggregateHashMap()}
+   |
+   |${generateFindOrInsert()}
+   |
+   |${generateEquals()}
+   |
+   |${generateHashFunction()}
+   |}
+ """.stripMargin
+  }
+
+  def initializeAggregateHashMap(): String = {
+val generatedSchema: String =
+  s"""
+ |new org.apache.spark.sql.types.StructType()
+ |${(groupingKeySchema ++ bufferSchema).map(key =>
+s""".add("${key.name}", 
org.apache.spark.sql.types.DataTypes.${key.dataType})""")
+.mkString("\n")};
+   """.stripMargin
+
+s"""
+   |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch 
batch;
+   |  private int[] buckets;
+   |  private int numBuckets;
+   |  private int maxSteps;
+   |  private int numRows = 0;
+   |  private org.apache.spark.sql.types.StructType schema = 
$generatedSchema
+   |
+   |  public $generatedClassName(int capacity, double loadFactor, int 
maxSteps) {
+   |assert (capacity > 0 && ((capacity & (capacity - 1)) == 0));
+   |this.maxSteps = maxSteps;
+   |numBuckets = (int) (capacity / loadFactor);
+   |batch = 
org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
+   |  org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
+   |buckets = new int[numBuckets];
+   |java.util.Arrays.fill(buckets, -1);
+   |  }
+   |
+   |  public $generatedClassName() {
+   |new $generatedClassName(1 << 16, 0.25, 5);
+   |  }
+ """.stripMargin
+  }
+
+  def generateHashFunction(): String = {
+s"""
+   |// TODO: Improve this Hash Function
+   |private long hash($groupingKeySignature) {
+   |  return ${groupingKeys.map(_._2).mkString(" & ")};
+   |}
+ """.stripMargin
+  }
+
+  def generateEquals(): String = {
+s"""
+   |private boolean equals(int idx, $groupingKeySignature) {
+   |  return ${groupingKeys.zipWithIndex.map(key =>
+s"batch.column(${key._2}).getLong(buckets[idx]) == 
${key._1._2}").mkString(" && ")};
+   |}
+ """.stripMargin
+  }
+
+  def generateFindOrInsert(): String = {
+s"""
+   |public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
findOrInsert(${
+  groupingKeySignature}) {
+   |  int idx = find(${groupingKeys.map(_._2).mkString(", ")});
+   |  if (idx !=

[GitHub] spark pull request: [SPARK-12785][SQL] Add ColumnarBatch, an in me...

2016-04-05 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/10628#issuecomment-205993243
  
I don't think it should be needed for OnHeap (it uses Platform.getInt which 
I think should work for both). For offheap, as far as I know, only a few of the 
puts that take byte[] as input would care. Subclassing or if(...) both sound 
fine to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...

2016-04-05 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12103#issuecomment-205991469
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...

2016-04-05 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12103#discussion_r58617460
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -232,6 +234,96 @@ public MapData getMap(int ordinal) {
 public Object get(int ordinal, DataType dataType) {
   throw new NotImplementedException();
 }
+
+@Override
+public void update(int ordinal, Object value) {
--- End diff --

Why did you have to implement this? We should try not to call this in the 
hot path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14310] Fix scan whole stage codegen to ...

2016-04-01 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12098#discussion_r58250401
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -318,25 +292,32 @@ private[sql] case class DataSourceScan(
 |   }
 | }""".stripMargin)
 
-  val value = ctx.freshName("value")
   s"""
  | if ($batch != null) {
  |   $scanBatches();
- | } else if ($input.hasNext()) {
- |   Object $value = $input.next();
- |   if ($value instanceof $columnarBatchClz) {
- | $batch = ($columnarBatchClz)$value;
+ | } else {
+ |   long $getBatchStart = System.nanoTime();
+ |   if ($input.hasNext()) {
+ | $batch = ($columnarBatchClz)$input.next();
+ | $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
  | $scanBatches();
- |   } else {
- | $scanRows((InternalRow) $value);
  |   }
  | }
""".stripMargin
 } else {
+  val row = ctx.freshName("row")
+  val exprRows =
+output.zipWithIndex.map(x => new BoundReference(x._2, 
x._1.dataType, x._1.nullable))
+  ctx.INPUT_ROW = row
+  ctx.currentVars = null
+  val columnsRowInput = exprRows.map(_.gen(ctx))
+  val inputRow = if (outputUnsafeRows) row else null
   s"""
- |if ($input.hasNext()) {
- |  $scanRows((InternalRow) $input.next());
- |}
+ | while (!shouldStop() && $input.hasNext()) {
--- End diff --

I tried a few variants and didn't see a difference. I think only the 
batched scan is fast enough to be sensitive to this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...

2016-04-01 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12103#discussion_r58247692
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -232,6 +233,56 @@ public MapData getMap(int ordinal) {
 public Object get(int ordinal, DataType dataType) {
   throw new NotImplementedException();
 }
+
+@Override
+public void setNullAt(int ordinal) {
+  columns[ordinal].putNull(rowId);
--- End diff --

how do you set something to NULL?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...

2016-04-01 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12103#discussion_r58247570
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
 ---
@@ -19,6 +19,8 @@
 
 import java.util.Arrays;
 
+import com.google.common.annotations.VisibleForTesting;
--- End diff --

is this necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...

2016-04-01 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12103#discussion_r58247531
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
 ---
@@ -472,9 +472,8 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
 .add("value", LongType)
   val map = new AggregateHashMap(schema)
   while (i < numKeys) {
-val idx = map.findOrInsert(i.toLong)
-map.batch.column(1).putLong(map.buckets(idx),
-  map.batch.column(1).getLong(map.buckets(idx)) + 1)
+val row = map.findOrInsert(i.toLong)
+row.setLong(1, row.getLong(1))
--- End diff --

row.getLong(1) + 1?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...

2016-04-01 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12103#discussion_r58247456
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -232,6 +233,56 @@ public MapData getMap(int ordinal) {
 public Object get(int ordinal, DataType dataType) {
   throw new NotImplementedException();
 }
+
+@Override
+public void setNullAt(int ordinal) {
+  columns[ordinal].putNull(rowId);
+}
+
+@Override
+public void update(int ordinal, Object value) {
+  throw new NotImplementedException();
+}
+
+@Override
+public void setBoolean(int ordinal, boolean value) {
+  columns[ordinal].putBoolean(rowId, value);
+}
+
+@Override
+public void setByte(int ordinal, byte value) {
+  columns[ordinal].putByte(rowId, value);
+}
+
+@Override
+public void setShort(int ordinal, short value) {
+  columns[ordinal].putShort(rowId, value);
+}
+
+@Override
+public void setInt(int ordinal, int value) {
+  columns[ordinal].putInt(rowId, value);
+}
+
+@Override
+public void setLong(int ordinal, long value) {
+  columns[ordinal].putLong(rowId, value);
+}
+
+@Override
+public void setFloat(int ordinal, float value) {
+  columns[ordinal].putFloat(rowId, value);
+}
+
+@Override
+public void setDouble(int ordinal, double value) {
+  columns[ordinal].putDouble(rowId, value);
+}
+
+@Override
+public void setDecimal(int ordinal, Decimal value, int precision) {
+  throw new NotImplementedException();
--- End diff --

why not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...

2016-04-01 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12103#discussion_r58247399
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -232,6 +233,56 @@ public MapData getMap(int ordinal) {
 public Object get(int ordinal, DataType dataType) {
   throw new NotImplementedException();
 }
+
+@Override
+public void setNullAt(int ordinal) {
+  columns[ordinal].putNull(rowId);
--- End diff --

Let's assert it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12785][SQL] Add ColumnarBatch, an in me...

2016-04-01 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/10628#issuecomment-204503727
  
This is not an architectural limitation. We have just not implemented 
support for it and there are only a few methods that would need to be 
implemented to support big endian. It would be great if someone in the 
community working on big endian hardware could do this. 

We don't rely on the byte ordering of the platform. The function you are 
referring to, putIntLittleEndian, is converting the input, which is little 
endian, to whatever the machine's endianness is. It doesn't specify what the 
host endian has to be. This is used in cases where the data is encoded on disk 
in a canonical binary format. On a big endian host, this would have to byte 
swap but I think that's inevitable as the on disk data had to pick an 
endianness (this is the code that's not implemented right now).

If you find places where spark requires a particular endianness, i'd 
consider that a bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14310] Fix scan whole stage codegen to ...

2016-03-31 Thread nongli
GitHub user nongli opened a pull request:

https://github.com/apache/spark/pull/12098

[SPARK-14310] Fix scan whole stage codegen to support batches without 
runtime check.

## What changes were proposed in this pull request?

Currently, we determine if the RDD will produce batches or rows by looking 
at the first
value. After other clean ups, this is not necessary anymore. This 
simplifies the code
and let's us measure time spent in the first batch.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nongli/spark spark-14310

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/12098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #12098


commit 303312c2a37ee47803a67812a96f271533fa5325
Author: Nong Li <n...@databricks.com>
Date:   2016-03-31T21:07:36Z

[SPARK-14310] Fix scan whole stage codegen to support batches without 
runtime check.

Currently, we determine if the RDD will produce batches or rows by looking 
at the first
value. After other clean ups, this is not necessary anymore. This 
simplifies the code
and let's us measure time spent in the first batch.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...

2016-03-30 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12055#issuecomment-203673714
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...

2016-03-30 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12055#discussion_r57977414
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Arrays;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+
+/**
+ * This is an illustrative implementation of an append-only 
single-key/single value aggregate hash
+ * map that can act as a 'cache' for extremely fast key-value lookups 
while evaluating aggregates
+ * (and fall back to the `BytesToBytesMap` if a given key isn't found). 
This can be potentially
+ * 'codegened' in TungstenAggregate to speed up aggregates w/ key.
+ *
+ * It is backed by a power-of-2-sized array for index lookups and a 
columnar batch that stores the
+ * key-value pairs. The index lookups in the array rely on linear probing 
(with a small number of
+ * maximum tries) and use an inexpensive hash function which makes it 
really efficient for a
+ * majority of lookups. However, using linear probing and an inexpensive 
hash function also makes it
+ * less robust as compared to the `BytesToBytesMap` (especially for a 
large number of keys or even
+ * for certain distribution of keys) and requires us to fall back on the 
latter for correctness.
+ */
+public class AggregateHashMap {
+  public ColumnarBatch batch;
+  public int[] buckets;
+
+  private int numBuckets;
+  private int numRows = 0;
+  private int maxSteps = 3;
+
+  private static int DEFAULT_NUM_BUCKETS = 65536 * 4;
--- End diff --

this is weird. configure the max capacity (in the batch) and the load 
factor and size numbuckets to capacity / load_factor. You have dependent 
constants here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14259][SQL] Add a FileSourceStrategy op...

2016-03-30 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12068#issuecomment-203587910
  
LGTM. I think there's more we can do here to bin pack a bit better (i.e. 
checking if small files can fit in existing partitions) but it would be good to 
get this in and have more experience with how to configure this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14278][SQL] Initialize columnar batch w...

2016-03-30 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12070#issuecomment-203569229
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...

2016-03-30 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12055#discussion_r57844666
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorizedHashMap.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Arrays;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+
+/**
+ * This is an illustrative implementation of a single-key/single value 
vectorized hash map that can
+ * be potentially 'codegened' in TungstenAggregate to speed up aggregate 
w/ key
+ */
+public class VectorizedHashMap {
+  public ColumnarBatch batch;
+  public int[] buckets;
+  private int numBuckets;
+  private int numRows = 0;
+  private int maxSteps = 3;
+
+  public VectorizedHashMap(int capacity, double loadFactor, int maxSteps) {
--- End diff --

I think this should take the schema as the parameter.

capacity needs to be a power of 2 for the mod to work. I'm not sure these 
should be exposed for the typical caller. At the very least, expose a ctor with 
reasonable defaults.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...

2016-03-30 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12055#discussion_r57844531
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorizedHashMap.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Arrays;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+
+/**
+ * This is an illustrative implementation of a single-key/single value 
vectorized hash map that can
+ * be potentially 'codegened' in TungstenAggregate to speed up aggregate 
w/ key
+ */
+public class VectorizedHashMap {
+  public ColumnarBatch batch;
--- End diff --

do these needs to be public?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...

2016-03-30 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12055#discussion_r57844469
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorizedHashMap.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Arrays;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+
+/**
+ * This is an illustrative implementation of a single-key/single value 
vectorized hash map that can
+ * be potentially 'codegened' in TungstenAggregate to speed up aggregate 
w/ key
+ */
+public class VectorizedHashMap {
--- End diff --

I think you should comment the overall design of this data structure, where 
it is good and where it is bad.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...

2016-03-30 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12055#discussion_r57844416
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorizedHashMap.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Arrays;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+
+/**
+ * This is an illustrative implementation of a single-key/single value 
vectorized hash map that can
+ * be potentially 'codegened' in TungstenAggregate to speed up aggregate 
w/ key
+ */
+public class VectorizedHashMap {
+  public ColumnarBatch batch;
+  public int[] buckets;
+  private int numBuckets;
+  private int numRows = 0;
+  private int maxSteps = 3;
+
+  public VectorizedHashMap(int capacity, double loadFactor, int maxSteps) {
+StructType schema = new StructType()
+.add("key", LongType)
+.add("value", LongType);
+this.maxSteps = maxSteps;
+numBuckets = capacity;
+batch = ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP, (int) 
(numBuckets * loadFactor));
+buckets = new int[numBuckets];
+Arrays.fill(buckets, -1);
+  }
+
+  public int findOrInsert(long key) {
+int idx = find(key);
+if (idx != -1 && buckets[idx] == -1) {
+  batch.column(0).putLong(numRows, key);
+  batch.column(1).putLong(numRows, 0);
+  buckets[idx] = numRows++;
+}
+return idx;
+  }
+
+  public int find(long key) {
+long h = hash(key);
+int step = 0;
+int idx = (int) h & (numBuckets - 1);
+while (step < maxSteps) {
+  if ((buckets[idx] == -1) || (buckets[idx] != -1 && equals(idx, 
key))) return idx;
--- End diff --

I don't think you need the check for -1 twice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14254][Core]Add logs to help investigat...

2016-03-29 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12046#issuecomment-203177749
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14217][SQL] Fix bug if parquet data has...

2016-03-29 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12017#issuecomment-202991498
  
@davies Yea. The format allows at most 1 dictionary per column per row 
group. Each page can have a different encoding though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

2016-03-29 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/12007#discussion_r57756896
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -241,73 +256,89 @@ private[sql] case class DataSourceScan(
 // TODO: The abstractions between this class and SqlNewHadoopRDD makes 
it difficult to know
 // here which path to use. Fix this.
 
-ctx.currentVars = null
-val columns1 = (output zip colVars).map { case (attr, colVar) =>
-  genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, 
attr.nullable) }
-val scanBatches = ctx.freshName("processBatches")
-ctx.addNewFunction(scanBatches,
-  s"""
-  | private void $scanBatches() throws java.io.IOException {
-  |  while (true) {
-  | int numRows = $batch.numRows();
-  | if ($idx == 0) {
-  |   ${columnAssigns.mkString("", "\n", "\n")}
-  |   $numOutputRows.add(numRows);
-  | }
-  |
-  | // this loop is very perf sensitive and changes to it should 
be measured carefully
-  | while ($idx < numRows) {
-  |   int $rowidx = $idx++;
-  |   ${consume(ctx, columns1).trim}
-  |   if (shouldStop()) return;
-  | }
-  |
-  | if (!$input.hasNext()) {
-  |   $batch = null;
-  |   break;
-  | }
-  | $batch = ($columnarBatchClz)$input.next();
-  | $idx = 0;
-  |   }
-  | }""".stripMargin)
-
 val exprRows =
-  output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, 
x._1.nullable))
+output.zipWithIndex.map(x => new BoundReference(x._2, 
x._1.dataType, x._1.nullable))
 ctx.INPUT_ROW = row
 ctx.currentVars = null
-val columns2 = exprRows.map(_.gen(ctx))
+val columnsRowInput = exprRows.map(_.gen(ctx))
 val inputRow = if (outputUnsafeRows) row else null
 val scanRows = ctx.freshName("processRows")
 ctx.addNewFunction(scanRows,
   s"""
-   | private void $scanRows(InternalRow $row) throws 
java.io.IOException {
-   |   boolean firstRow = true;
-   |   while (firstRow || $input.hasNext()) {
-   | if (firstRow) {
-   |   firstRow = false;
-   | } else {
-   |   $row = (InternalRow) $input.next();
-   | }
-   | $numOutputRows.add(1);
-   | ${consume(ctx, columns2, inputRow).trim}
-   | if (shouldStop()) return;
-   |   }
-   | }""".stripMargin)
-
-val value = ctx.freshName("value")
-s"""
-   | if ($batch != null) {
-   |   $scanBatches();
-   | } else if ($input.hasNext()) {
-   |   Object $value = $input.next();
-   |   if ($value instanceof $columnarBatchClz) {
-   | $batch = ($columnarBatchClz)$value;
-   | $scanBatches();
-   |   } else {
-   | $scanRows((InternalRow) $value);
-   |   }
-   | }
- """.stripMargin
+ | private void $scanRows(InternalRow $row) throws 
java.io.IOException {
+ |   boolean firstRow = true;
+ |   while (!shouldStop() && (firstRow || $input.hasNext())) {
+ | if (firstRow) {
+ |   firstRow = false;
+ | } else {
+ |   $row = (InternalRow) $input.next();
+ | }
+ | $numOutputRows.add(1);
+ | ${consume(ctx, columnsRowInput, inputRow).trim}
+ |   }
+ | }""".stripMargin)
+
+// Timers for how long we spent inside the scan. We can only maintain 
this when using batches,
+// otherwise the overhead is too high.
+if (canProcessBatches()) {
+  val scanTimeMetric = metricTerm(ctx, "scanTime")
+  val getBatchStart = ctx.freshName("scanStart")
+  val scanTimeTotalNs = ctx.freshName("scanTime")
+  ctx.currentVars = null
+  val columnsBatchInput = (output zip colVars).map { case (attr, 
colVar) =>
+genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, 
attr.nullable) }
+  val scanBatches = ctx.freshName("processBatches")
+  ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 
0;")
+
+  ctx.addNewFunction(scanBatches,
+s"""
+| private void $scanBatches() throws java.io.IOException {
+|  while (true) {
+| i

[GitHub] spark pull request: [SPARK-13607][SQL] Improve compression perform...

2016-03-28 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11461#issuecomment-202633270
  
can you include the benchmark code as well?

What do the numbers mean? For example: IntDeltaBinaryPacking(0.182). Does 
this mean it is 18% of the original size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13607][SQL] Improve compression perform...

2016-03-28 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11461#discussion_r57654201
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 ---
@@ -530,3 +532,283 @@ private[columnar] case object LongDelta extends 
CompressionScheme {
 }
   }
 }
+
+/**
+ * Writes integral-type values with delta encoding and binary packing.
+ * The format is as follows:
--- End diff --

How does this relate to the parquet spec/implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14217][SQL] Fix bug if parquet data has...

2016-03-28 Thread nongli
GitHub user nongli opened a pull request:

https://github.com/apache/spark/pull/12017

[SPARK-14217][SQL] Fix bug if parquet data has columns that use dictionary 
encoding for some of the data

## What changes were proposed in this pull request?

Currently, this causes batches where some values are dictionary encoded and 
some
which are not. The non-dictionary encoded values cause us to remove the 
dictionary
from the batch causing the first values to return garbage.

This patch fixes the issue by first decoding the dictionary for the values 
that are
already dictionary encoded before switching. A similar thing is done for 
the reverse
case where the initial values are not dictionary encoded.

## How was this patch tested?
This is difficult to test but replicated on a test cluster using a large 
tpcds data set.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nongli/spark spark-14217

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/12017.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #12017


commit 1ba0a4c1b69f1fe33ccc45c16836ea21a72e8bc3
Author: Nong Li <n...@databricks.com>
Date:   2016-03-28T23:18:37Z

[SPARK-14217][SQL] Fix bug if parquet data has columns that use dictionary 
encoding for some of the data.

Currently, this causes batches where some values are dictionary encoded and 
some
which are not. The non-dictionary encoded values cause us to remove the 
dictionary
from the batch causing the first values to return garbage.

This patch fixes the issue by first decoding the dictionary for the values 
that are
already dictionary encoded before switching. A similar thing is done for 
the reverse
case where the initial values are not dictionary encoded.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13923][SPARK-14014][SQL] Session catalo...

2016-03-28 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12006#issuecomment-202544366
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

2016-03-28 Thread nongli
GitHub user nongli opened a pull request:

https://github.com/apache/spark/pull/12007

[SPARK-14210][SQL] Add a metric for time spent in scans.

## What changes were proposed in this pull request?

This adds a metric to parquet scans that measures the time in just the scan 
phase. This is
only possible when the scan returns ColumnarBatches, otherwise the overhead 
is too high.

This combined with the pipeline metric lets us easily see what percent of 
the time was
in the scan.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nongli/spark spark-14210

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/12007.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #12007


commit b17bac4dd880bc93455d85f9591468941a88ce97
Author: Nong Li <n...@databricks.com>
Date:   2016-03-25T22:47:19Z

[SPARK-14210][SQL] Add a metric for time spent in scans.

This adds a metric to parquet scans that measures the time in just the scan 
phase. This is
only possible when the scan returns ColumnarBatches, otherwise the overhead 
is too high.

This combined with the pipeline metric lets us easily see what percent of 
the time was
in the scan.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...

2016-03-28 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/12007#issuecomment-202529228
  
Here's a screenshot of what this looks like.

![screen shot 2016-03-28 at 11 56 33 
am](https://cloud.githubusercontent.com/assets/242468/14086908/51369016-f4dc-11e5-9cb7-fd06abc2e32d.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14144][SQL] Explicitly identify/catch U...

2016-03-25 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11950#issuecomment-201381524
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13981][SQL] Defer evaluating variables ...

2016-03-24 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11792#issuecomment-201059224
  
@davies I did this a different way. Let me know your thoughts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

2016-03-24 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11791#issuecomment-201035800
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

2016-03-24 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11791#discussion_r57390963
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -279,13 +275,117 @@ private[spark] class MemoryStore(
 }
   }
 
+  /**
+   * Attempt to put the given block in memory store as bytes.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated the estimated size of the 
stored data. In case of
+   * failure, return a handle which allows the caller to either 
finish the serialization
+   * by spilling to disk or to deserialize the 
partially-serialized block and reconstruct
+   * the original input iterator. The caller must either fully 
consume this result
+   * iterator or call `discard()` on it in order to free the 
storage memory consumed by the
+   * partially-unrolled block.
+   */
+  private[storage] def putIteratorAsBytes[T](
+  blockId: BlockId,
+  values: Iterator[T],
+  classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+
+require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
+
+// Whether there is still enough memory for us to continue unrolling 
this block
+var keepUnrolling = true
+// Initial per-task memory to request for unrolling blocks (bytes).
+val initialMemoryThreshold = unrollMemoryThreshold
+// Keep track of unroll memory used by this particular block / 
putIterator() operation
+var unrollMemoryUsedByThisBlock = 0L
+// Underlying buffer for unrolling the block
+val redirectableStream = new RedirectableOutputStream
+val byteArrayChunkOutputStream = new 
ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
+redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+val serializationStream: SerializationStream = {
+  val ser = serializerManager.getSerializer(classTag).newInstance()
+  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
+}
+
+// Request enough memory to begin unrolling
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
+
+if (!keepUnrolling) {
+  logWarning(s"Failed to reserve initial memory threshold of " +
+s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
+} else {
+  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+}
+
+def reserveAdditionalMemoryIfNecessary(): Unit = {
+  if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
+val amountToRequest = byteArrayChunkOutputStream.size - 
unrollMemoryUsedByThisBlock
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
+if (keepUnrolling) {
+  unrollMemoryUsedByThisBlock += amountToRequest
+}
+  }
+}
+
+// Unroll this block safely, checking whether we have exceeded our 
threshold
+while (values.hasNext && keepUnrolling) {
+  serializationStream.writeObject(values.next())(classTag)
+  reserveAdditionalMemoryIfNecessary()
+}
+
+// Make sure that we have enough memory to store the block. By this 
point, it is possible that
+// the block's actual memory usage has exceeded the unroll memory by a 
small amount, so we
+// perform one final call to attempt to allocate additional memory if 
necessary.
--- End diff --

This is because of the call to close? That can use more memory?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14092] [SQL] move shouldStop() to end o...

2016-03-23 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11912#issuecomment-200466916
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14092] [SQL] move shouldStop() to end o...

2016-03-23 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11912#discussion_r57206785
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -255,11 +255,11 @@ private[sql] case class DataSourceScan(
   |   $numOutputRows.add(numRows);
   | }
   |
-  | while (!shouldStop() && $idx < numRows) {
+  | while ($idx < numRows) {
   |   int $rowidx = $idx++;
   |   ${consume(ctx, columns1).trim}
+  |   if (shouldStop()) return;
--- End diff --

Can we add a comment around line 248 saying this loop is very perf 
sensitive and changes to it should be measured carefully?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14015][SQL] Support TimestampType in ve...

2016-03-23 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11882#discussion_r57175750
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 ---
@@ -259,8 +259,6 @@ private void initializeInternal() throws IOException {
   if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
 throw new IOException("Complex types not supported.");
   }
-  PrimitiveType primitiveType = t.asPrimitiveType();
-
   originalTypes[i] = t.getOriginalType();
 
   // TODO: Be extremely cautious in what is supported. Expand this.
--- End diff --

I think we can remove this check now too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14015][SQL] Support TimestampType in ve...

2016-03-23 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11882#issuecomment-200387315
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13981][SQL] Defer evaluating variables ...

2016-03-22 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11792#discussion_r57064001
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -110,23 +114,28 @@ case class Filter(condition: Expression, child: 
SparkPlan)
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
String): String = {
 val numOutput = metricTerm(ctx, "numOutputRows")
 
-// filter out the nulls
-val filterOutNull = notNullAttributes.map { a =>
-  val idx = child.output.indexOf(a)
-  s"if (${input(idx).isNull}) continue;"
-}.mkString("\n")
+val conjuncts = splitConjunctivePredicates(condition)
 
-ctx.currentVars = input
-val predicates = otherPreds.map { e =>
-  val bound = ExpressionCanonicalizer.execute(
-BindReferences.bindReference(e, output))
-  val ev = bound.gen(ctx)
+// Generate the code to evaluate each filter.
+val generated = conjuncts.map { c =>
--- End diff --

That defeats the point of this. Then we're referencing all the attributes 
first before the predicates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13325][SQL] Create a 64-bit hashcode ex...

2016-03-22 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11209#issuecomment-25187
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14015][SQL] Support TimestampType in ve...

2016-03-22 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11882#discussion_r57026910
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 ---
@@ -308,7 +322,7 @@ private void readIntBatch(int rowId, int num, 
ColumnVector column) throws IOExce
 
   private void readLongBatch(int rowId, int num, ColumnVector column) 
throws IOException {
 // This is where we implement support for the valid type conversions.
-if (column.dataType() == DataTypes.LongType ||
+if (column.dataType() == DataTypes.LongType || column.dataType() == 
DataTypes.TimestampType ||
 DecimalType.is64BitDecimalType(column.dataType())) {
   defColumn.readLongs(
--- End diff --

Do we have tests where this type is *not* dictionary encoded?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14015][SQL] Support TimestampType in ve...

2016-03-22 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11882#discussion_r57026815
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 ---
@@ -308,7 +322,7 @@ private void readIntBatch(int rowId, int num, 
ColumnVector column) throws IOExce
 
   private void readLongBatch(int rowId, int num, ColumnVector column) 
throws IOException {
 // This is where we implement support for the valid type conversions.
-if (column.dataType() == DataTypes.LongType ||
+if (column.dataType() == DataTypes.LongType || column.dataType() == 
DataTypes.TimestampType ||
 DecimalType.is64BitDecimalType(column.dataType())) {
   defColumn.readLongs(
--- End diff --

hmm. How does this work? readLongs is expecting to read parquet int64 
physical types. How is it able to read this other physical type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13883][SQL] Parquet Implementation of F...

2016-03-21 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11709#issuecomment-199577463
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14016][SQL] Support high-precision deci...

2016-03-21 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11869#issuecomment-199539418
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11791#discussion_r56916760
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -500,3 +601,79 @@ private[storage] class PartiallyUnrolledIterator(
 iter = null
   }
 }
+
+/**
+ * A wrapper which allows an open [[OutputStream]] to be redirected to a 
different sink.
+ */
+private class RedirectableOutputStream extends OutputStream {
+  private[this] var os: OutputStream = _
+  def setOutputStream(s: OutputStream): Unit = { os = s }
+  override def write(b: Int): Unit = os.write(b)
+  override def write(b: Array[Byte]): Unit = os.write(b)
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = 
os.write(b, off, len)
+  override def flush(): Unit = os.flush()
+  override def close(): Unit = os.close()
+}
+
+/**
+ * The result of a failed [[MemoryStore.putIteratorAsBytes()]] call.
+ *
+ * @param memoryStore the MemoryStore, used for freeing memory.
+ * @param blockManager the BlockManager, used for deserializing values.
+ * @param blockId the block id.
+ * @param serializationStream a serialization stream which writes to 
[[redirectableOutputStream]].
+ * @param redirectableOutputStream an OutputStream which can be redirected 
to a different sink.
+ * @param unrollMemory the amount of unroll memory used by the values in 
`unrolled`.
+ * @param unrolled a byte buffer containing the partially-serialized 
values.
+ * @param rest the rest of the original iterator passed to
+ * [[MemoryStore.putIteratorAsValues()]].
+ */
+private[storage] class PartiallySerializedBlock(
+memoryStore: MemoryStore,
+blockManager: BlockManager,
+blockId: BlockId,
+serializationStream: SerializationStream,
+redirectableOutputStream: RedirectableOutputStream,
+unrollMemory: Long,
+unrolled: ChunkedByteBuffer,
+rest: Iterator[Any]) {
+
+  /**
+   * Called to dispose of this block and free its memory.
+   */
+  def discard(): Unit = {
--- End diff --

who calls this now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11791#discussion_r56916037
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1074,7 +1074,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
 // Unroll with all the space in the world. This should succeed.
-var putResult = memoryStore.putIterator("unroll", smallList.iterator, 
StorageLevel.MEMORY_ONLY)
+var putResult = memoryStore.putIteratorAsValues("unroll", 
smallList.iterator)
--- End diff --

Do we need more test cases for putIteratorAsBytes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13990] Automatically pick serializer wh...

2016-03-21 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11801#issuecomment-199535749
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11791#discussion_r56914928
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -244,13 +244,113 @@ private[spark] class MemoryStore(
 }
   }
 
+  /**
+   * Attempt to put the given block in memory store as bytes.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated the estimated size of the 
stored data. In case of
+   * failure, return a handle which allows the caller to either 
finish the serialization
+   * by spilling to disk or to deserialize the 
partially-serialized block and reconstruct
+   * the original input iterator. The caller must either fully 
consume this result
+   * iterator or call `discard()` on it in order to free the 
storage memory consumed by the
+   * partially-unrolled block.
+   */
+  private[storage] def putIteratorAsBytes(
+  blockId: BlockId,
+  values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
+
+require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
+
+// Whether there is still enough memory for us to continue unrolling 
this block
+var keepUnrolling = true
+// Initial per-task memory to request for unrolling blocks (bytes).
+val initialMemoryThreshold = unrollMemoryThreshold
+// Keep track of unroll memory used by this particular block / 
putIterator() operation
+var unrollMemoryUsedByThisBlock = 0L
+// Underlying buffer for unrolling the block
+val redirectableStream = new RedirectableOutputStream
+val byteArrayChunkOutputStream = new 
ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
+redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+val serializationStream: SerializationStream = {
+  val ser = blockManager.defaultSerializer.newInstance()
+  ser.serializeStream(blockManager.wrapForCompression(blockId, 
redirectableStream))
+}
+
+// Request enough memory to begin unrolling
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
+
+if (!keepUnrolling) {
+  logWarning(s"Failed to reserve initial memory threshold of " +
+s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
+} else {
+  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+}
+
+def reserveAdditionalMemoryIfNecessary(): Unit = {
+  if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
+val amountToRequest = byteArrayChunkOutputStream.size - 
unrollMemoryUsedByThisBlock
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
+if (keepUnrolling) {
+  unrollMemoryUsedByThisBlock += amountToRequest
+}
+unrollMemoryUsedByThisBlock += amountToRequest
+  }
+}
+
+// Unroll this block safely, checking whether we have exceeded our 
threshold
+while (values.hasNext && keepUnrolling) {
+  serializationStream.writeObject(values.next())
+  reserveAdditionalMemoryIfNecessary()
+}
+
+if (keepUnrolling) {
+  serializationStream.close()
+  reserveAdditionalMemoryIfNecessary()
+}
+
+if (keepUnrolling) {
+  val entry = SerializedMemoryEntry(
+new 
ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)))
+  // Synchronize so that transfer is atomic
+  memoryManager.synchronized {
--- End diff --

I see this replicated in a few places (transferUnrollToStorage()). It might 
be easier to have releaseUnrollMemoryForThisTask take another argument to 
optionally transfer it to storage


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-u

[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11791#discussion_r56914517
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -244,13 +244,113 @@ private[spark] class MemoryStore(
 }
   }
 
+  /**
+   * Attempt to put the given block in memory store as bytes.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated the estimated size of the 
stored data. In case of
+   * failure, return a handle which allows the caller to either 
finish the serialization
+   * by spilling to disk or to deserialize the 
partially-serialized block and reconstruct
+   * the original input iterator. The caller must either fully 
consume this result
+   * iterator or call `discard()` on it in order to free the 
storage memory consumed by the
+   * partially-unrolled block.
+   */
+  private[storage] def putIteratorAsBytes(
+  blockId: BlockId,
+  values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
+
+require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
+
+// Whether there is still enough memory for us to continue unrolling 
this block
+var keepUnrolling = true
+// Initial per-task memory to request for unrolling blocks (bytes).
+val initialMemoryThreshold = unrollMemoryThreshold
+// Keep track of unroll memory used by this particular block / 
putIterator() operation
+var unrollMemoryUsedByThisBlock = 0L
+// Underlying buffer for unrolling the block
+val redirectableStream = new RedirectableOutputStream
+val byteArrayChunkOutputStream = new 
ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
+redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+val serializationStream: SerializationStream = {
+  val ser = blockManager.defaultSerializer.newInstance()
+  ser.serializeStream(blockManager.wrapForCompression(blockId, 
redirectableStream))
+}
+
+// Request enough memory to begin unrolling
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
+
+if (!keepUnrolling) {
+  logWarning(s"Failed to reserve initial memory threshold of " +
+s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
+} else {
+  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+}
+
+def reserveAdditionalMemoryIfNecessary(): Unit = {
+  if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
+val amountToRequest = byteArrayChunkOutputStream.size - 
unrollMemoryUsedByThisBlock
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
+if (keepUnrolling) {
+  unrollMemoryUsedByThisBlock += amountToRequest
+}
+unrollMemoryUsedByThisBlock += amountToRequest
+  }
+}
+
+// Unroll this block safely, checking whether we have exceeded our 
threshold
+while (values.hasNext && keepUnrolling) {
+  serializationStream.writeObject(values.next())
+  reserveAdditionalMemoryIfNecessary()
+}
+
+if (keepUnrolling) {
--- End diff --

move this into line 317?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11791#discussion_r56914342
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -244,13 +244,113 @@ private[spark] class MemoryStore(
 }
   }
 
+  /**
+   * Attempt to put the given block in memory store as bytes.
+   *
+   * It's possible that the iterator is too large to materialize and store 
in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while 
periodically checking
+   * whether there is enough free memory. If the block is successfully 
materialized, then the
+   * temporary unroll memory used during the materialization is 
"transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the 
block.
+   *
+   * @return in case of success, the estimated the estimated size of the 
stored data. In case of
+   * failure, return a handle which allows the caller to either 
finish the serialization
+   * by spilling to disk or to deserialize the 
partially-serialized block and reconstruct
+   * the original input iterator. The caller must either fully 
consume this result
+   * iterator or call `discard()` on it in order to free the 
storage memory consumed by the
+   * partially-unrolled block.
+   */
+  private[storage] def putIteratorAsBytes(
+  blockId: BlockId,
+  values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
+
+require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
+
+// Whether there is still enough memory for us to continue unrolling 
this block
+var keepUnrolling = true
+// Initial per-task memory to request for unrolling blocks (bytes).
+val initialMemoryThreshold = unrollMemoryThreshold
+// Keep track of unroll memory used by this particular block / 
putIterator() operation
+var unrollMemoryUsedByThisBlock = 0L
+// Underlying buffer for unrolling the block
+val redirectableStream = new RedirectableOutputStream
+val byteArrayChunkOutputStream = new 
ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
+redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+val serializationStream: SerializationStream = {
+  val ser = blockManager.defaultSerializer.newInstance()
+  ser.serializeStream(blockManager.wrapForCompression(blockId, 
redirectableStream))
+}
+
+// Request enough memory to begin unrolling
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
+
+if (!keepUnrolling) {
+  logWarning(s"Failed to reserve initial memory threshold of " +
+s"${Utils.bytesToString(initialMemoryThreshold)} for computing 
block $blockId in memory.")
+} else {
+  unrollMemoryUsedByThisBlock += initialMemoryThreshold
+}
+
+def reserveAdditionalMemoryIfNecessary(): Unit = {
+  if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
+val amountToRequest = byteArrayChunkOutputStream.size - 
unrollMemoryUsedByThisBlock
+keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
+if (keepUnrolling) {
+  unrollMemoryUsedByThisBlock += amountToRequest
+}
+unrollMemoryUsedByThisBlock += amountToRequest
--- End diff --

i dont understand why you add this twice in some cases


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13990] Automatically pick serializer wh...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11801#discussion_r56913076
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -703,13 +710,13 @@ private[spark] class BlockManager(
*
* @return true if the block was stored or false if an error occurred.
*/
-  def putBytes(
+  def putBytes[T: ClassTag]( // TODO(josh)
--- End diff --

is this adddressed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13325][SQL] Create a 64-bit hashcode ex...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11209#discussion_r56907012
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala 
---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.Random
+
+import org.apache.spark.sql.catalyst.expressions.XXH64
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.hash.Murmur3_x86_32
+import org.apache.spark.util.Benchmark
+
+/**
+ * Synthetic benchmark for MurMurHash 3 and xxHash64.
+ */
+object HashByteArrayBenchmark {
+  def test(length: Int, seed: Long, numArrays: Int, iters: Int): Unit = {
+val random = new Random(seed)
+val arrays = Array.fill[Array[Byte]](numArrays) {
+  val bytes = new Array[Byte](length)
+  random.nextBytes(bytes)
+  bytes
+}
+
+val benchmark = new Benchmark("Hash byte arrays with length " + 
length, iters * numArrays)
+benchmark.addCase("Murmur3_x86_32") { _: Int =>
+  for (_ <- 0L until iters) {
+var sum = 0
+var i = 0
+while (i < numArrays) {
+  sum += Murmur3_x86_32.hashUnsafeBytes(arrays(i), 
Platform.BYTE_ARRAY_OFFSET, length, 42)
+  i += 1
+}
+  }
+}
+
+benchmark.addCase("xxHash 64-bit") { _: Int =>
+  for (_ <- 0L until iters) {
+var sum = 0L
+var i = 0
+while (i < numArrays) {
+  sum += XXH64.hashUnsafeBytes(arrays(i), 
Platform.BYTE_ARRAY_OFFSET, length, 42)
+  i += 1
+}
+  }
+}
+
+benchmark.run()
+  }
+
+  def main(args: Array[String]): Unit = {
+// Add 31 to all arrays to create worse case alignment for xxHash.
+/*
+Running benchmark: Hash byte arrays with length 31
--- End diff --

Can you add a case for 8 bytes? I'm curious if we should use this for joins 
and aggs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13325][SQL] Create a 64-bit hashcode ex...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11209#discussion_r56906895
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
 ---
@@ -423,6 +345,121 @@ case class Murmur3Hash(children: Seq[Expression], 
seed: Int) extends Expression
   case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, 
result, ctx)
 }
   }
+
+  protected def hasherClassName: String
+}
+
+/**
+ * Base class for interpreted hash functions.
+ */
+abstract class InterpretedHashFunction[E] {
--- End diff --

Should we just change this to always reutrn long and downcast where we 
currently use int? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13916][SQL] Add a metric to WholeStageC...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11741#discussion_r56905886
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala 
---
@@ -300,6 +304,10 @@ case class WholeStageCodegen(child: SparkPlan) extends 
UnaryNode with CodegenSup
   override def outputPartitioning: Partitioning = child.outputPartitioning
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override private[sql] lazy val metrics = Map(
--- End diff --

Doesn't matter. That would be similar to have two projects in a single 
stage. The metrics are per instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13805][SQL] Generate code that get a va...

2016-03-21 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11636#issuecomment-199318644
  
Have you rerun the benchmark with these changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13674][SQL] Add wholestage codegen supp...

2016-03-21 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11517#discussion_r56830819
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala ---
@@ -107,6 +113,28 @@ class BernoulliCellSampler[T](lb: Double, ub: Double, 
complement: Boolean = fals
 
   override def setSeed(seed: Long): Unit = rng.setSeed(seed)
 
+  override def sample(): Int = {
--- End diff --

Can we not have the code duplication and write the iterator version based 
on this function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13873] [SQL] Avoid copy of UnsafeRow wh...

2016-03-20 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11740#issuecomment-197610722
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13871][SQL] Support for inferring filte...

2016-03-20 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11665#issuecomment-197548312
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13921] Store serialized blocks as multi...

2016-03-20 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11748#discussion_r56576152
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.io
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import com.google.common.primitives.UnsignedBytes
+import io.netty.buffer.{ByteBuf, Unpooled}
+
+import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.storage.StorageUtils
+
+/**
+ * Read-only byte buffer which is physically stored as multiple chunks 
rather than a single
+ * contiguous array.
+ *
+ * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array 
must be non-empty and have
+ *   position == 0. Ownership of these buffers is transferred 
to the ChunkedByteBuffer,
+ *   so if these buffers may also be used elsewhere then the 
caller is responsible for
+ *   copying them as needed.
+ */
+private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
+  require(chunks != null, "chunks must not be null")
+  require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
+  require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
+
+  /**
+   * This size of this buffer, in bytes.
+   */
+  val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
+
+  def this(byteBuffer: ByteBuffer) = {
+this(Array(byteBuffer))
+  }
+
+  /**
+   * Write this buffer to a channel.
+   */
+  def writeFully(channel: WritableByteChannel): Unit = {
+for (bytes <- getChunks()) {
+  while (bytes.remaining > 0) {
+channel.write(bytes)
+  }
+}
+  }
+
+  /**
+   * Wrap this buffer to view it as a Netty ByteBuf.
+   */
+  def toNetty: ByteBuf = {
+Unpooled.wrappedBuffer(getChunks(): _*)
+  }
+
+  /**
+   * Copy this buffer into a new byte array.
+   *
+   * @throws UnsupportedOperationException if this buffer's size exceeds 
the maximum array size.
+   */
+  def toArray: Array[Byte] = {
+if (size >= Integer.MAX_VALUE) {
+  throw new UnsupportedOperationException(
+s"cannot call toArray because buffer size ($size bytes) exceeds 
maximum array size")
+}
+val byteChannel = new ByteArrayWritableChannel(size.toInt)
+writeFully(byteChannel)
+byteChannel.close()
+byteChannel.getData
+  }
+
+  /**
+   * Copy this buffer into a new ByteBuffer.
+   *
+   * @throws UnsupportedOperationException if this buffer's size exceeds 
the max ByteBuffer size.
+   */
+  def toByteBuffer: ByteBuffer = {
+if (chunks.length == 1) {
+  chunks.head.duplicate()
+} else {
+  ByteBuffer.wrap(toArray)
+}
+  }
+
+  /**
+   * Creates an input stream to read data from this ChunkedByteBuffer.
+   *
+   * @param dispose if true, [[dispose()]] will be called at the end of 
the stream
+   *in order to close any memory-mapped files which back 
this buffer.
+   */
+  def toInputStream(dispose: Boolean = false): InputStream = {
+new ChunkedByteBufferInputStream(this, dispose)
+  }
+
+  /**
+   * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer.
+   */
+  def getChunks(): Array[ByteBuffer] = {
+chunks.map(_.duplicate())
+  }
+
+  /**
+   * Make a copy of this ChunkedByteBuffer, copying all of the backing 
data into new buffers.
+   * The new buffer will share no resources with the original buffer.
+   */
+  def copy(): ChunkedByteBuffer = {
+val copiedChunks = getChunks().map { chunk =>
+  

[GitHub] spark pull request: [SPARK-13922][SQL] Filter rows with null attri...

2016-03-19 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11749#discussion_r56416653
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -345,15 +358,25 @@ public void setColumn(int ordinal, ColumnVector 
column) {
* in this batch will not include this row.
*/
   public final void markFiltered(int rowId) {
-assert(filteredRows[rowId] == false);
+assert(!filteredRows[rowId]);
 filteredRows[rowId] = true;
 ++numRowsFiltered;
   }
 
+  /**
+   * Marks a given column as non-nullable. Any row that has a NULL value 
for the corresponding
+   * attribute is filtered out.
+   */
+  public final void filterNullsInColumn(int ordinal) {
+assert(!nullFilteredColumns.contains(ordinal));
--- End diff --

I don't think this assert is necessary. I think this is perfectly valid and 
makes this api easier to use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13926] Automatically use Kryo serialize...

2016-03-19 Thread nongli
Github user nongli commented on the pull request:

https://github.com/apache/spark/pull/11755#issuecomment-197552094
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13921] Store serialized blocks as multi...

2016-03-19 Thread nongli
Github user nongli commented on a diff in the pull request:

https://github.com/apache/spark/pull/11748#discussion_r56576372
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.io
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import com.google.common.primitives.UnsignedBytes
+import io.netty.buffer.{ByteBuf, Unpooled}
+
+import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.storage.StorageUtils
+
+/**
+ * Read-only byte buffer which is physically stored as multiple chunks 
rather than a single
+ * contiguous array.
+ *
+ * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array 
must be non-empty and have
+ *   position == 0. Ownership of these buffers is transferred 
to the ChunkedByteBuffer,
+ *   so if these buffers may also be used elsewhere then the 
caller is responsible for
+ *   copying them as needed.
+ */
+private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
+  require(chunks != null, "chunks must not be null")
+  require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
+  require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
+
+  /**
+   * This size of this buffer, in bytes.
+   */
+  val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
+
+  def this(byteBuffer: ByteBuffer) = {
+this(Array(byteBuffer))
+  }
+
+  /**
+   * Write this buffer to a channel.
+   */
+  def writeFully(channel: WritableByteChannel): Unit = {
+for (bytes <- getChunks()) {
+  while (bytes.remaining > 0) {
+channel.write(bytes)
+  }
+}
+  }
+
+  /**
+   * Wrap this buffer to view it as a Netty ByteBuf.
+   */
+  def toNetty: ByteBuf = {
+Unpooled.wrappedBuffer(getChunks(): _*)
+  }
+
+  /**
+   * Copy this buffer into a new byte array.
+   *
+   * @throws UnsupportedOperationException if this buffer's size exceeds 
the maximum array size.
+   */
+  def toArray: Array[Byte] = {
+if (size >= Integer.MAX_VALUE) {
+  throw new UnsupportedOperationException(
+s"cannot call toArray because buffer size ($size bytes) exceeds 
maximum array size")
+}
+val byteChannel = new ByteArrayWritableChannel(size.toInt)
+writeFully(byteChannel)
+byteChannel.close()
+byteChannel.getData
+  }
+
+  /**
+   * Copy this buffer into a new ByteBuffer.
+   *
+   * @throws UnsupportedOperationException if this buffer's size exceeds 
the max ByteBuffer size.
+   */
+  def toByteBuffer: ByteBuffer = {
+if (chunks.length == 1) {
+  chunks.head.duplicate()
+} else {
+  ByteBuffer.wrap(toArray)
+}
+  }
+
+  /**
+   * Creates an input stream to read data from this ChunkedByteBuffer.
+   *
+   * @param dispose if true, [[dispose()]] will be called at the end of 
the stream
+   *in order to close any memory-mapped files which back 
this buffer.
+   */
+  def toInputStream(dispose: Boolean = false): InputStream = {
+new ChunkedByteBufferInputStream(this, dispose)
+  }
+
+  /**
+   * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer.
+   */
+  def getChunks(): Array[ByteBuffer] = {
+chunks.map(_.duplicate())
+  }
+
+  /**
+   * Make a copy of this ChunkedByteBuffer, copying all of the backing 
data into new buffers.
+   * The new buffer will share no resources with the original buffer.
+   */
+  def copy(): ChunkedByteBuffer = {
+val copiedChunks = getChunks().map { chunk =>
+  

  1   2   3   4   5   6   7   >