[GitHub] spark pull request: [SPARK-14620][SQL] Use/benchmark a better hash...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12379#issuecomment-210667378 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13745][SQL]Support columnar in memory r...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12397#issuecomment-210600346 Agreed. LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14620][SQL] Use/benchmark a better hash...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12379#discussion_r59921018 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala --- @@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Aggregate w keys: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative --- -codegen = F 2219 / 2392 9.4 105.8 1.0X -codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X -codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X +codegen = F 2323 / 2567 9.0 110.8 1.0X +codegen = T hashmap = F 1182 / 1246 17.7 56.4 2.0X +codegen = T hashmap = T 381 / 489 55.0 18.2 6.1X +*/ + } + + ignore("aggregate with randomized keys") { +val N = 20 << 20 + +val benchmark = new Benchmark("Aggregate w keys", N) +sqlContext.range(N).selectExpr("id", "floor(rand() * 1) as k").registerTempTable("test") + +def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect() --- End diff -- I feel you're spending a lot of time in rand --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14620][SQL] Use/benchmark a better hash...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12379#discussion_r59904543 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala --- @@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Aggregate w keys: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative --- -codegen = F 2219 / 2392 9.4 105.8 1.0X -codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X -codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X +codegen = F 2323 / 2567 9.0 110.8 1.0X +codegen = T hashmap = F 1182 / 1246 17.7 56.4 2.0X +codegen = T hashmap = T 381 / 489 55.0 18.2 6.1X +*/ + } + + ignore("aggregate with randomized keys") { +val N = 20 << 20 + +val benchmark = new Benchmark("Aggregate w keys", N) +sqlContext.range(N).selectExpr("id", "floor(rand() * 1) as k").registerTempTable("test") + +def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect() --- End diff -- what does this plan look like? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12345#issuecomment-210249105 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59814608 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala --- @@ -261,7 +263,12 @@ case class TungstenAggregate( .map(_.asInstanceOf[DeclarativeAggregate]) private val bufferSchema = StructType.fromAttributes(aggregateBufferAttributes) - // The name for HashMap + // The name for Vectorized HashMap + private var vectorizedHashMapTerm: String = _ + private var isVectorizedHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled && +(modes.contains(Partial) || modes.contains(PartialMerge)) --- End diff -- I don't think this is the correct logic. I think modes.forall( is Partial) is what we want. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59814567 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala --- @@ -437,17 +444,21 @@ case class TungstenAggregate( val initAgg = ctx.freshName("initAgg") ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") -// create AggregateHashMap -val isAggregateHashMapEnabled: Boolean = false -val isAggregateHashMapSupported: Boolean = +// We currently only enable vectorized hashmap for long key/value types +isVectorizedHashMapEnabled = isVectorizedHashMapEnabled && --- End diff -- I think it's weird that you have the check split between here and line 268. maybe combine all these checks to 268 and make isVecdtorizedEnabled a val. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13745][SQL]Support columnar in memory r...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12397#discussion_r59789608 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -31,6 +33,8 @@ private byte[] buffer; private int offset; private int bitOffset; // Only used for booleans. + + private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); --- End diff -- I'm wondering if there is a difference between wrapping the buffer with a ByteBuffer once (in InitFromPage) and if that behaves any differently than reversing for each integer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13745][SQL]Support columnar in memory r...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12397#discussion_r59767603 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -31,6 +33,8 @@ private byte[] buffer; private int offset; private int bitOffset; // Only used for booleans. + + private final static boolean bigEndianPlatform = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); --- End diff -- How does this implementation compare to ByteBuffer.wrap(buffer).order(...)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14217][SQL] Fix bug if parquet data has...
Github user nongli closed the pull request at: https://github.com/apache/spark/pull/12017 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59650089 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala --- @@ -533,56 +578,104 @@ case class TungstenAggregate( val inputAttr = aggregateBufferAttributes ++ child.output ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input + +ctx.INPUT_ROW = aggregateRow +// TODO: support subexpression elimination +val aggregateRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx)) --- End diff -- This code needs some more comments to unerstand the high level parts. It's not obviuos why you do this twice, once with ctx.INPUT_ROW = aggregateRow and once again with buffer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59649964 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala --- @@ -21,10 +21,10 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.types.StructType /** - * This is a helper object to generate an append-only single-key/single value aggregate hash - * map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates - * (and fall back to the `BytesToBytesMap` if a given key isn't found). This is 'codegened' in - * TungstenAggregate to speed up aggregates w/ key. + * This is a helper class to generate an append-only aggregate hash map that can act as a 'cache' + * for extremely fast key-value lookups while evaluating aggregates (and fall back to the + * `BytesToBytesMap` if a given key isn't found). This is 'codegened' in TungstenAggregate to speed + * up aggregates w/ key. --- End diff -- You chose to have this not handle null keys right? Comment that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59649860 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala --- @@ -533,56 +578,104 @@ case class TungstenAggregate( val inputAttr = aggregateBufferAttributes ++ child.output ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input + +ctx.INPUT_ROW = aggregateRow +// TODO: support subexpression elimination --- End diff -- let's remove this todo in both places, not sure what it means anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59649827 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala --- @@ -513,8 +555,11 @@ case class TungstenAggregate( ctx.currentVars = input val keyCode = GenerateUnsafeProjection.createCode( --- End diff -- I think we can do better with the variable names to improve readability. keyCode, groupByKeys, key all sound too similar. Add row or unsafe row to the keys that have been collected to an unsafe row or differentiate them somehow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59649610 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala --- @@ -484,13 +497,42 @@ case class TungstenAggregate( // so `copyResult` should be reset to `false`. ctx.copyResult = false +def outputFromGeneratedMap: Option[String] = { + if (isAggregateHashMapEnabled) { +val row = ctx.freshName("aggregateHashMapRow") --- End diff -- Comment at a high level what this is doing. Something like. " Iterate over the aggregate rows and convert them from ColumnarBatch.Row to UnsafeRow" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59649417 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala --- @@ -65,27 +69,43 @@ class ColumnarAggMapCodeGenerator( .mkString("\n")}; """.stripMargin +val generatedAggBufferSchema: String = + s""" + |new org.apache.spark.sql.types.StructType() + |${bufferSchema.map(key => +s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") +.mkString("\n")}; + """.stripMargin + s""" | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; + | private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; | private int[] buckets; | private int numBuckets; | private int maxSteps; | private int numRows = 0; | private org.apache.spark.sql.types.StructType schema = $generatedSchema + | private org.apache.spark.sql.types.StructType aggregateBufferSchema = + |$generatedAggBufferSchema | - | public $generatedClassName(int capacity, double loadFactor, int maxSteps) { - |assert (capacity > 0 && ((capacity & (capacity - 1)) == 0)); - |this.maxSteps = maxSteps; - |numBuckets = (int) (capacity / loadFactor); + | public $generatedClassName() { + |// TODO: These should be generated based on the schema + |int DEFAULT_CAPACITY = 1 << 16; + |double DEFAULT_LOAD_FACTOR = 0.25; + |int DEFAULT_MAX_STEPS = 2; + |assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0)); + |this.maxSteps = DEFAULT_MAX_STEPS; + |numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR); |batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, - | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); + | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); + |aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( + | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); --- End diff -- Let's leave a TODO to fix this. There should be a nicer way to get a projection of a batch instead of this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12345#issuecomment-209695875 @sameeragarwal have you updated the generated code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12345#issuecomment-209290930 I don't think this works if we have NULL keys. This is kind of annoying, let's think about that tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12345#issuecomment-209290367 /* 120 */ batch.column(1).putLong(numRows, 0); I don't think this is right. You should initialize the agg exprs to NULL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59506077 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala --- @@ -65,27 +65,43 @@ class ColumnarAggMapCodeGenerator( .mkString("\n")}; """.stripMargin +val generatedAggBufferSchema: String = + s""" + |new org.apache.spark.sql.types.StructType() + |${bufferSchema.map(key => +s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") +.mkString("\n")}; + """.stripMargin + s""" - | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; + | public org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; --- End diff -- Can we not make either of these public. Also, have a comment that explains why you have two batches their relation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59505993 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala --- @@ -153,16 +153,36 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { ignore("aggregate with keys") { val N = 20 << 20 -runBenchmark("Aggregate w keys", N) { - sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() +val benchmark = new Benchmark("Aggregate w keys", N) --- End diff -- Do we have tests that exercise the correctness here? It should group by a varying number of keys to test the paths you addd. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14447][SQL] Speed up TungstenAggregate ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12345#discussion_r59505821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala --- @@ -103,7 +119,7 @@ class ColumnarAggMapCodeGenerator( s""" |// TODO: Improve this hash function |private long hash($groupingKeySignature) { - | return ${groupingKeys.map(_._2).mkString(" ^ ")}; + | return ${groupingKeys.map(_._2).mkString(" | ")}; --- End diff -- Any reason not do implmeent the h = h * 37 + v hash function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14547] Avoid DNS resolution for reusing...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12315#discussion_r59409060 --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java --- @@ -149,25 +148,31 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO } if (cachedClient.isActive()) { -logger.trace("Returning cached connection to {}: {}", address, cachedClient); +logger.trace("Returning cached connection to {}: {}", + cachedClient.getSocketAddress(), cachedClient); return cachedClient; } } // If we reach here, we don't have an existing connection open. Let's create a new one. // Multiple threads might race here to create new connections. Keep only one of them active. +final long preResolveHost = System.nanoTime(); +final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort); +final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 100; +logger.trace("Spent {} ms to resolve {}", hostResolveTimeMs, resolvedAddress); --- End diff -- I think it's more useful to log a warning if this time is greater than say 2 seconds. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14547] Avoid DNS resolution for reusing...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12315#issuecomment-208998610 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14520][SQL] Use correct return type in ...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12292#issuecomment-208524105 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14482][SQL] Change default Parquet code...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12256#issuecomment-207523676 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14467][SQL] Interleave CPU and IO bette...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12243#discussion_r59044639 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala --- @@ -46,37 +50,80 @@ case class PartitionedFile( */ case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition +object FileScanRDD { + private val ioExecutionContext = ExecutionContext.fromExecutorService( +ThreadUtils.newDaemonCachedThreadPool("FileScanRDD", 16)) --- End diff -- It's difficult to model this as the total number of cores because what this is intended to do is background IO and use very little CPU. The async io will still use some CPU resources but expected to be very low, a small fraction of a core. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12161#discussion_r59043603 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.aggregate + +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.types.StructType + +class TungstenAggregateHashMap( +ctx: CodegenContext, +generatedClassName: String, +groupingKeySchema: StructType, +bufferSchema: StructType) { + val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, ctx.freshName("key"))) + val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), key.dataType.typeName)) + val groupingKeySignature = groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ") + + def generate(): String = { +s""" + |public class $generatedClassName { + |${initializeAggregateHashMap()} + | + |${generateFindOrInsert()} + | + |${generateEquals()} + | + |${generateHashFunction()} + |} + """.stripMargin + } + + def initializeAggregateHashMap(): String = { +val generatedSchema: String = --- End diff -- that was my initial thought too but this generated class only works for one schema due to the specialized equals/hash/find signatures. It's not particularly useful to pass in a schema if only one works. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12161#issuecomment-207146943 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14467][SQL] Interleave CPU and IO bette...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12243#issuecomment-207136072 @holdenk I tried to simplify the logic. Let me know your thoughts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12161#issuecomment-207134229 I think the old version makes more sense. The generated code only works for a particular schema so no reason to pass it in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14467][SQL] Interleave CPU and IO bette...
GitHub user nongli opened a pull request: https://github.com/apache/spark/pull/12243 [SPARK-14467][SQL] Interleave CPU and IO better in FileScanRDD. ## What changes were proposed in this pull request? This patch updates FileScanRDD to start reading from the next file while the current file is being processed. The goal is to have better interleaving of CPU and IO. It does this by launching a future which will asynchronously start preparing the next file to be read. The expectation is that the async task is IO intensive and the current file (which includes all the computation for the query plan) is CPU intensive. For some file formats, this would just mean opening the file and the initial setup. For file formats like parquet, this would mean doing all the IO for all the columns. ## How was this patch tested? Good coverage from existing tests. Added a new one to test the flag. Cluster testing on tpcds queries. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nongli/spark interleave Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/12243.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #12243 commit cc6d98a17f6fa4249951802f981c2224d354e651 Author: Nong Li <n...@databricks.com> Date: 2016-04-05T20:36:34Z [SPARK-14467][SQL] Interleave CPU and IO better in FileScanRDD. This patch updates FileScanRDD to start reading from the next file while the current file is being processed. The goal is to have better interleaving of CPU and IO. It does this by launching a future which will asynchronously start preparing the next file to be read. The expectation is that the async task is IO intensive and the current file (which includes all the computation for the query plan) is CPU intensive. For some file formats, this would just mean opening the file and the initial setup. For file formats like parquet, this would mean doing all the IO for all the columns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14369][SQL] Locality support for FileSc...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12153#discussion_r58917033 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala --- @@ -137,10 +140,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => + val blockLocations = getBlockLocations(file) (0L to file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining -PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size) +// Finds out a list of location hosts where lives the first block that contains the +// starting point the file block starts at `offset` with length `size`. +val hosts = getBlockHosts(blockLocations, offset) +PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts) --- End diff -- This is not likely to generate good locality right? Is it reasonable to repurpose this? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala#L124 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14369][SQL] Locality support for FileSc...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12153#discussion_r58906143 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala --- @@ -137,10 +140,14 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => + val blockLocations = getBlockLocations(file) (0L to file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining -PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size) +// Finds out a list of location hosts where lives the first block that contains the +// starting point the file block starts at `offset` with length `size`. +val hosts = getBlockHosts(blockLocations, offset) +PartitionedFile(partition.values, file.getPath.toUri.toString, offset, size, hosts) --- End diff -- How is this locality information maintained when we coalesce in the logic below? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12161#issuecomment-206949611 The generated code takes a schema in the ctor and creates one as a member var. Let's just use the member var one like you had originally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12047#issuecomment-206590169 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12047#discussion_r58789868 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala --- @@ -434,12 +435,20 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { case _ => true } + private def numOfNestedFields(dataType: DataType): Int = dataType match { +case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum +case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType) +case a: ArrayType => numOfNestedFields(a.elementType) + 1 --- End diff -- why + 1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12047#discussion_r58789578 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java --- @@ -121,6 +109,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont throws IOException, InterruptedException, UnsupportedOperationException { super.initialize(inputSplit, taskAttemptContext); initializeInternal(); +Configuration conf = ContextUtil.getConfiguration(taskAttemptContext); --- End diff -- what does this do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12161#discussion_r58788879 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.aggregate + +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.types.StructType + +class TungstenAggregateHashMap( +ctx: CodegenContext, +generatedClassName: String, +groupingKeySchema: StructType, +bufferSchema: StructType) { + val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, ctx.freshName("key"))) + val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), key.dataType.typeName)) + val groupingKeySignature = groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ") + + def generate(): String = { +s""" + |public class $generatedClassName { + |${initializeAggregateHashMap()} + | + |${generateFindOrInsert()} + | + |${generateEquals()} + | + |${generateHashFunction()} + |} + """.stripMargin + } + + def initializeAggregateHashMap(): String = { +val generatedSchema: String = --- End diff -- I don't htink this should be generated. I think the generated ctor should take a schema and we should get that from the non-generated code if possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14394][SQL] Generate AggregateHashMap c...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12161#discussion_r58788712 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregateHashMap.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.aggregate + +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.types.StructType + +class TungstenAggregateHashMap( +ctx: CodegenContext, +generatedClassName: String, +groupingKeySchema: StructType, +bufferSchema: StructType) { + val groupingKeys = groupingKeySchema.map(key => (key.dataType.typeName, ctx.freshName("key"))) + val bufferValues = bufferSchema.map(key => (ctx.freshName("value"), key.dataType.typeName)) + val groupingKeySignature = groupingKeys.map(_.productIterator.toList.mkString(" ")).mkString(", ") + + def generate(): String = { +s""" + |public class $generatedClassName { + |${initializeAggregateHashMap()} + | + |${generateFindOrInsert()} + | + |${generateEquals()} + | + |${generateHashFunction()} + |} + """.stripMargin + } + + def initializeAggregateHashMap(): String = { +val generatedSchema: String = + s""" + |new org.apache.spark.sql.types.StructType() + |${(groupingKeySchema ++ bufferSchema).map(key => +s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""") +.mkString("\n")}; + """.stripMargin + +s""" + | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; + | private int[] buckets; + | private int numBuckets; + | private int maxSteps; + | private int numRows = 0; + | private org.apache.spark.sql.types.StructType schema = $generatedSchema + | + | public $generatedClassName(int capacity, double loadFactor, int maxSteps) { + |assert (capacity > 0 && ((capacity & (capacity - 1)) == 0)); + |this.maxSteps = maxSteps; + |numBuckets = (int) (capacity / loadFactor); + |batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, + | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); + |buckets = new int[numBuckets]; + |java.util.Arrays.fill(buckets, -1); + | } + | + | public $generatedClassName() { + |new $generatedClassName(1 << 16, 0.25, 5); + | } + """.stripMargin + } + + def generateHashFunction(): String = { +s""" + |// TODO: Improve this Hash Function + |private long hash($groupingKeySignature) { + | return ${groupingKeys.map(_._2).mkString(" & ")}; + |} + """.stripMargin + } + + def generateEquals(): String = { +s""" + |private boolean equals(int idx, $groupingKeySignature) { + | return ${groupingKeys.zipWithIndex.map(key => +s"batch.column(${key._2}).getLong(buckets[idx]) == ${key._1._2}").mkString(" && ")}; + |} + """.stripMargin + } + + def generateFindOrInsert(): String = { +s""" + |public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(${ + groupingKeySignature}) { + | int idx = find(${groupingKeys.map(_._2).mkString(", ")}); + | if (idx !=
[GitHub] spark pull request: [SPARK-12785][SQL] Add ColumnarBatch, an in me...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/10628#issuecomment-205993243 I don't think it should be needed for OnHeap (it uses Platform.getInt which I think should work for both). For offheap, as far as I know, only a few of the puts that take byte[] as input would care. Subclassing or if(...) both sound fine to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12103#issuecomment-205991469 lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12103#discussion_r58617460 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -232,6 +234,96 @@ public MapData getMap(int ordinal) { public Object get(int ordinal, DataType dataType) { throw new NotImplementedException(); } + +@Override +public void update(int ordinal, Object value) { --- End diff -- Why did you have to implement this? We should try not to call this in the hot path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14310] Fix scan whole stage codegen to ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12098#discussion_r58250401 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -318,25 +292,32 @@ private[sql] case class DataSourceScan( | } | }""".stripMargin) - val value = ctx.freshName("value") s""" | if ($batch != null) { | $scanBatches(); - | } else if ($input.hasNext()) { - | Object $value = $input.next(); - | if ($value instanceof $columnarBatchClz) { - | $batch = ($columnarBatchClz)$value; + | } else { + | long $getBatchStart = System.nanoTime(); + | if ($input.hasNext()) { + | $batch = ($columnarBatchClz)$input.next(); + | $scanTimeTotalNs += System.nanoTime() - $getBatchStart; | $scanBatches(); - | } else { - | $scanRows((InternalRow) $value); | } | } """.stripMargin } else { + val row = ctx.freshName("row") + val exprRows = +output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable)) + ctx.INPUT_ROW = row + ctx.currentVars = null + val columnsRowInput = exprRows.map(_.gen(ctx)) + val inputRow = if (outputUnsafeRows) row else null s""" - |if ($input.hasNext()) { - | $scanRows((InternalRow) $input.next()); - |} + | while (!shouldStop() && $input.hasNext()) { --- End diff -- I tried a few variants and didn't see a difference. I think only the batched scan is fast enough to be sensitive to this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12103#discussion_r58247692 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -232,6 +233,56 @@ public MapData getMap(int ordinal) { public Object get(int ordinal, DataType dataType) { throw new NotImplementedException(); } + +@Override +public void setNullAt(int ordinal) { + columns[ordinal].putNull(rowId); --- End diff -- how do you set something to NULL? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12103#discussion_r58247570 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java --- @@ -19,6 +19,8 @@ import java.util.Arrays; +import com.google.common.annotations.VisibleForTesting; --- End diff -- is this necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12103#discussion_r58247531 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala --- @@ -472,9 +472,8 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { .add("value", LongType) val map = new AggregateHashMap(schema) while (i < numKeys) { -val idx = map.findOrInsert(i.toLong) -map.batch.column(1).putLong(map.buckets(idx), - map.batch.column(1).getLong(map.buckets(idx)) + 1) +val row = map.findOrInsert(i.toLong) +row.setLong(1, row.getLong(1)) --- End diff -- row.getLong(1) + 1? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12103#discussion_r58247456 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -232,6 +233,56 @@ public MapData getMap(int ordinal) { public Object get(int ordinal, DataType dataType) { throw new NotImplementedException(); } + +@Override +public void setNullAt(int ordinal) { + columns[ordinal].putNull(rowId); +} + +@Override +public void update(int ordinal, Object value) { + throw new NotImplementedException(); +} + +@Override +public void setBoolean(int ordinal, boolean value) { + columns[ordinal].putBoolean(rowId, value); +} + +@Override +public void setByte(int ordinal, byte value) { + columns[ordinal].putByte(rowId, value); +} + +@Override +public void setShort(int ordinal, short value) { + columns[ordinal].putShort(rowId, value); +} + +@Override +public void setInt(int ordinal, int value) { + columns[ordinal].putInt(rowId, value); +} + +@Override +public void setLong(int ordinal, long value) { + columns[ordinal].putLong(rowId, value); +} + +@Override +public void setFloat(int ordinal, float value) { + columns[ordinal].putFloat(rowId, value); +} + +@Override +public void setDouble(int ordinal, double value) { + columns[ordinal].putDouble(rowId, value); +} + +@Override +public void setDecimal(int ordinal, Decimal value, int precision) { + throw new NotImplementedException(); --- End diff -- why not? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14320][SQL] Make ColumnarBatch.Row muta...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12103#discussion_r58247399 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -232,6 +233,56 @@ public MapData getMap(int ordinal) { public Object get(int ordinal, DataType dataType) { throw new NotImplementedException(); } + +@Override +public void setNullAt(int ordinal) { + columns[ordinal].putNull(rowId); --- End diff -- Let's assert it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12785][SQL] Add ColumnarBatch, an in me...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/10628#issuecomment-204503727 This is not an architectural limitation. We have just not implemented support for it and there are only a few methods that would need to be implemented to support big endian. It would be great if someone in the community working on big endian hardware could do this. We don't rely on the byte ordering of the platform. The function you are referring to, putIntLittleEndian, is converting the input, which is little endian, to whatever the machine's endianness is. It doesn't specify what the host endian has to be. This is used in cases where the data is encoded on disk in a canonical binary format. On a big endian host, this would have to byte swap but I think that's inevitable as the on disk data had to pick an endianness (this is the code that's not implemented right now). If you find places where spark requires a particular endianness, i'd consider that a bug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14310] Fix scan whole stage codegen to ...
GitHub user nongli opened a pull request: https://github.com/apache/spark/pull/12098 [SPARK-14310] Fix scan whole stage codegen to support batches without runtime check. ## What changes were proposed in this pull request? Currently, we determine if the RDD will produce batches or rows by looking at the first value. After other clean ups, this is not necessary anymore. This simplifies the code and let's us measure time spent in the first batch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nongli/spark spark-14310 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/12098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #12098 commit 303312c2a37ee47803a67812a96f271533fa5325 Author: Nong Li <n...@databricks.com> Date: 2016-03-31T21:07:36Z [SPARK-14310] Fix scan whole stage codegen to support batches without runtime check. Currently, we determine if the RDD will produce batches or rows by looking at the first value. After other clean ups, this is not necessary anymore. This simplifies the code and let's us measure time spent in the first batch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12055#issuecomment-203673714 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12055#discussion_r57977414 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import java.util.Arrays; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.LongType; + +/** + * This is an illustrative implementation of an append-only single-key/single value aggregate hash + * map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates + * (and fall back to the `BytesToBytesMap` if a given key isn't found). This can be potentially + * 'codegened' in TungstenAggregate to speed up aggregates w/ key. + * + * It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the + * key-value pairs. The index lookups in the array rely on linear probing (with a small number of + * maximum tries) and use an inexpensive hash function which makes it really efficient for a + * majority of lookups. However, using linear probing and an inexpensive hash function also makes it + * less robust as compared to the `BytesToBytesMap` (especially for a large number of keys or even + * for certain distribution of keys) and requires us to fall back on the latter for correctness. + */ +public class AggregateHashMap { + public ColumnarBatch batch; + public int[] buckets; + + private int numBuckets; + private int numRows = 0; + private int maxSteps = 3; + + private static int DEFAULT_NUM_BUCKETS = 65536 * 4; --- End diff -- this is weird. configure the max capacity (in the batch) and the load factor and size numbuckets to capacity / load_factor. You have dependent constants here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14259][SQL] Add a FileSourceStrategy op...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12068#issuecomment-203587910 LGTM. I think there's more we can do here to bin pack a bit better (i.e. checking if small files can fit in existing partitions) but it would be good to get this in and have more experience with how to configure this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14278][SQL] Initialize columnar batch w...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12070#issuecomment-203569229 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12055#discussion_r57844666 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorizedHashMap.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import java.util.Arrays; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.LongType; + +/** + * This is an illustrative implementation of a single-key/single value vectorized hash map that can + * be potentially 'codegened' in TungstenAggregate to speed up aggregate w/ key + */ +public class VectorizedHashMap { + public ColumnarBatch batch; + public int[] buckets; + private int numBuckets; + private int numRows = 0; + private int maxSteps = 3; + + public VectorizedHashMap(int capacity, double loadFactor, int maxSteps) { --- End diff -- I think this should take the schema as the parameter. capacity needs to be a power of 2 for the mod to work. I'm not sure these should be exposed for the typical caller. At the very least, expose a ctor with reasonable defaults. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12055#discussion_r57844531 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorizedHashMap.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import java.util.Arrays; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.LongType; + +/** + * This is an illustrative implementation of a single-key/single value vectorized hash map that can + * be potentially 'codegened' in TungstenAggregate to speed up aggregate w/ key + */ +public class VectorizedHashMap { + public ColumnarBatch batch; --- End diff -- do these needs to be public? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12055#discussion_r57844469 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorizedHashMap.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import java.util.Arrays; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.LongType; + +/** + * This is an illustrative implementation of a single-key/single value vectorized hash map that can + * be potentially 'codegened' in TungstenAggregate to speed up aggregate w/ key + */ +public class VectorizedHashMap { --- End diff -- I think you should comment the overall design of this data structure, where it is good and where it is bad. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14263][SQL] Benchmark Vectorized HashMa...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12055#discussion_r57844416 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/VectorizedHashMap.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import java.util.Arrays; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.LongType; + +/** + * This is an illustrative implementation of a single-key/single value vectorized hash map that can + * be potentially 'codegened' in TungstenAggregate to speed up aggregate w/ key + */ +public class VectorizedHashMap { + public ColumnarBatch batch; + public int[] buckets; + private int numBuckets; + private int numRows = 0; + private int maxSteps = 3; + + public VectorizedHashMap(int capacity, double loadFactor, int maxSteps) { +StructType schema = new StructType() +.add("key", LongType) +.add("value", LongType); +this.maxSteps = maxSteps; +numBuckets = capacity; +batch = ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP, (int) (numBuckets * loadFactor)); +buckets = new int[numBuckets]; +Arrays.fill(buckets, -1); + } + + public int findOrInsert(long key) { +int idx = find(key); +if (idx != -1 && buckets[idx] == -1) { + batch.column(0).putLong(numRows, key); + batch.column(1).putLong(numRows, 0); + buckets[idx] = numRows++; +} +return idx; + } + + public int find(long key) { +long h = hash(key); +int step = 0; +int idx = (int) h & (numBuckets - 1); +while (step < maxSteps) { + if ((buckets[idx] == -1) || (buckets[idx] != -1 && equals(idx, key))) return idx; --- End diff -- I don't think you need the check for -1 twice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14254][Core]Add logs to help investigat...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12046#issuecomment-203177749 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14217][SQL] Fix bug if parquet data has...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12017#issuecomment-202991498 @davies Yea. The format allows at most 1 dictionary per column per row group. Each page can have a different encoding though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/12007#discussion_r57756896 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -241,73 +256,89 @@ private[sql] case class DataSourceScan( // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know // here which path to use. Fix this. -ctx.currentVars = null -val columns1 = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } -val scanBatches = ctx.freshName("processBatches") -ctx.addNewFunction(scanBatches, - s""" - | private void $scanBatches() throws java.io.IOException { - | while (true) { - | int numRows = $batch.numRows(); - | if ($idx == 0) { - | ${columnAssigns.mkString("", "\n", "\n")} - | $numOutputRows.add(numRows); - | } - | - | // this loop is very perf sensitive and changes to it should be measured carefully - | while ($idx < numRows) { - | int $rowidx = $idx++; - | ${consume(ctx, columns1).trim} - | if (shouldStop()) return; - | } - | - | if (!$input.hasNext()) { - | $batch = null; - | break; - | } - | $batch = ($columnarBatchClz)$input.next(); - | $idx = 0; - | } - | }""".stripMargin) - val exprRows = - output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable)) +output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable)) ctx.INPUT_ROW = row ctx.currentVars = null -val columns2 = exprRows.map(_.gen(ctx)) +val columnsRowInput = exprRows.map(_.gen(ctx)) val inputRow = if (outputUnsafeRows) row else null val scanRows = ctx.freshName("processRows") ctx.addNewFunction(scanRows, s""" - | private void $scanRows(InternalRow $row) throws java.io.IOException { - | boolean firstRow = true; - | while (firstRow || $input.hasNext()) { - | if (firstRow) { - | firstRow = false; - | } else { - | $row = (InternalRow) $input.next(); - | } - | $numOutputRows.add(1); - | ${consume(ctx, columns2, inputRow).trim} - | if (shouldStop()) return; - | } - | }""".stripMargin) - -val value = ctx.freshName("value") -s""" - | if ($batch != null) { - | $scanBatches(); - | } else if ($input.hasNext()) { - | Object $value = $input.next(); - | if ($value instanceof $columnarBatchClz) { - | $batch = ($columnarBatchClz)$value; - | $scanBatches(); - | } else { - | $scanRows((InternalRow) $value); - | } - | } - """.stripMargin + | private void $scanRows(InternalRow $row) throws java.io.IOException { + | boolean firstRow = true; + | while (!shouldStop() && (firstRow || $input.hasNext())) { + | if (firstRow) { + | firstRow = false; + | } else { + | $row = (InternalRow) $input.next(); + | } + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, inputRow).trim} + | } + | }""".stripMargin) + +// Timers for how long we spent inside the scan. We can only maintain this when using batches, +// otherwise the overhead is too high. +if (canProcessBatches()) { + val scanTimeMetric = metricTerm(ctx, "scanTime") + val getBatchStart = ctx.freshName("scanStart") + val scanTimeTotalNs = ctx.freshName("scanTime") + ctx.currentVars = null + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => +genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } + val scanBatches = ctx.freshName("processBatches") + ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") + + ctx.addNewFunction(scanBatches, +s""" +| private void $scanBatches() throws java.io.IOException { +| while (true) { +| i
[GitHub] spark pull request: [SPARK-13607][SQL] Improve compression perform...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11461#issuecomment-202633270 can you include the benchmark code as well? What do the numbers mean? For example: IntDeltaBinaryPacking(0.182). Does this mean it is 18% of the original size? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13607][SQL] Improve compression perform...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11461#discussion_r57654201 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala --- @@ -530,3 +532,283 @@ private[columnar] case object LongDelta extends CompressionScheme { } } } + +/** + * Writes integral-type values with delta encoding and binary packing. + * The format is as follows: --- End diff -- How does this relate to the parquet spec/implementation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14217][SQL] Fix bug if parquet data has...
GitHub user nongli opened a pull request: https://github.com/apache/spark/pull/12017 [SPARK-14217][SQL] Fix bug if parquet data has columns that use dictionary encoding for some of the data ## What changes were proposed in this pull request? Currently, this causes batches where some values are dictionary encoded and some which are not. The non-dictionary encoded values cause us to remove the dictionary from the batch causing the first values to return garbage. This patch fixes the issue by first decoding the dictionary for the values that are already dictionary encoded before switching. A similar thing is done for the reverse case where the initial values are not dictionary encoded. ## How was this patch tested? This is difficult to test but replicated on a test cluster using a large tpcds data set. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nongli/spark spark-14217 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/12017.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #12017 commit 1ba0a4c1b69f1fe33ccc45c16836ea21a72e8bc3 Author: Nong Li <n...@databricks.com> Date: 2016-03-28T23:18:37Z [SPARK-14217][SQL] Fix bug if parquet data has columns that use dictionary encoding for some of the data. Currently, this causes batches where some values are dictionary encoded and some which are not. The non-dictionary encoded values cause us to remove the dictionary from the batch causing the first values to return garbage. This patch fixes the issue by first decoding the dictionary for the values that are already dictionary encoded before switching. A similar thing is done for the reverse case where the initial values are not dictionary encoded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13923][SPARK-14014][SQL] Session catalo...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12006#issuecomment-202544366 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...
GitHub user nongli opened a pull request: https://github.com/apache/spark/pull/12007 [SPARK-14210][SQL] Add a metric for time spent in scans. ## What changes were proposed in this pull request? This adds a metric to parquet scans that measures the time in just the scan phase. This is only possible when the scan returns ColumnarBatches, otherwise the overhead is too high. This combined with the pipeline metric lets us easily see what percent of the time was in the scan. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nongli/spark spark-14210 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/12007.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #12007 commit b17bac4dd880bc93455d85f9591468941a88ce97 Author: Nong Li <n...@databricks.com> Date: 2016-03-25T22:47:19Z [SPARK-14210][SQL] Add a metric for time spent in scans. This adds a metric to parquet scans that measures the time in just the scan phase. This is only possible when the scan returns ColumnarBatches, otherwise the overhead is too high. This combined with the pipeline metric lets us easily see what percent of the time was in the scan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14210][SQL] Add a metric for time spent...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/12007#issuecomment-202529228 Here's a screenshot of what this looks like. ![screen shot 2016-03-28 at 11 56 33 am](https://cloud.githubusercontent.com/assets/242468/14086908/51369016-f4dc-11e5-9cb7-fd06abc2e32d.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14144][SQL] Explicitly identify/catch U...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11950#issuecomment-201381524 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13981][SQL] Defer evaluating variables ...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11792#issuecomment-201059224 @davies I did this a different way. Let me know your thoughts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11791#issuecomment-201035800 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11791#discussion_r57390963 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -279,13 +275,117 @@ private[spark] class MemoryStore( } } + /** + * Attempt to put the given block in memory store as bytes. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated the estimated size of the stored data. In case of + * failure, return a handle which allows the caller to either finish the serialization + * by spilling to disk or to deserialize the partially-serialized block and reconstruct + * the original input iterator. The caller must either fully consume this result + * iterator or call `discard()` on it in order to free the storage memory consumed by the + * partially-unrolled block. + */ + private[storage] def putIteratorAsBytes[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = { + +require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") + +// Whether there is still enough memory for us to continue unrolling this block +var keepUnrolling = true +// Initial per-task memory to request for unrolling blocks (bytes). +val initialMemoryThreshold = unrollMemoryThreshold +// Keep track of unroll memory used by this particular block / putIterator() operation +var unrollMemoryUsedByThisBlock = 0L +// Underlying buffer for unrolling the block +val redirectableStream = new RedirectableOutputStream +val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt) +redirectableStream.setOutputStream(byteArrayChunkOutputStream) +val serializationStream: SerializationStream = { + val ser = serializerManager.getSerializer(classTag).newInstance() + ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) +} + +// Request enough memory to begin unrolling +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + +if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + +s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") +} else { + unrollMemoryUsedByThisBlock += initialMemoryThreshold +} + +def reserveAdditionalMemoryIfNecessary(): Unit = { + if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) { +val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest +} + } +} + +// Unroll this block safely, checking whether we have exceeded our threshold +while (values.hasNext && keepUnrolling) { + serializationStream.writeObject(values.next())(classTag) + reserveAdditionalMemoryIfNecessary() +} + +// Make sure that we have enough memory to store the block. By this point, it is possible that +// the block's actual memory usage has exceeded the unroll memory by a small amount, so we +// perform one final call to attempt to allocate additional memory if necessary. --- End diff -- This is because of the call to close? That can use more memory? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14092] [SQL] move shouldStop() to end o...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11912#issuecomment-200466916 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14092] [SQL] move shouldStop() to end o...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11912#discussion_r57206785 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -255,11 +255,11 @@ private[sql] case class DataSourceScan( | $numOutputRows.add(numRows); | } | - | while (!shouldStop() && $idx < numRows) { + | while ($idx < numRows) { | int $rowidx = $idx++; | ${consume(ctx, columns1).trim} + | if (shouldStop()) return; --- End diff -- Can we add a comment around line 248 saying this loop is very perf sensitive and changes to it should be measured carefully? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14015][SQL] Support TimestampType in ve...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11882#discussion_r57175750 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java --- @@ -259,8 +259,6 @@ private void initializeInternal() throws IOException { if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { throw new IOException("Complex types not supported."); } - PrimitiveType primitiveType = t.asPrimitiveType(); - originalTypes[i] = t.getOriginalType(); // TODO: Be extremely cautious in what is supported. Expand this. --- End diff -- I think we can remove this check now too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14015][SQL] Support TimestampType in ve...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11882#issuecomment-200387315 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13981][SQL] Defer evaluating variables ...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11792#discussion_r57064001 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala --- @@ -110,23 +114,28 @@ case class Filter(condition: Expression, child: SparkPlan) override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { val numOutput = metricTerm(ctx, "numOutputRows") -// filter out the nulls -val filterOutNull = notNullAttributes.map { a => - val idx = child.output.indexOf(a) - s"if (${input(idx).isNull}) continue;" -}.mkString("\n") +val conjuncts = splitConjunctivePredicates(condition) -ctx.currentVars = input -val predicates = otherPreds.map { e => - val bound = ExpressionCanonicalizer.execute( -BindReferences.bindReference(e, output)) - val ev = bound.gen(ctx) +// Generate the code to evaluate each filter. +val generated = conjuncts.map { c => --- End diff -- That defeats the point of this. Then we're referencing all the attributes first before the predicates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13325][SQL] Create a 64-bit hashcode ex...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11209#issuecomment-25187 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14015][SQL] Support TimestampType in ve...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11882#discussion_r57026910 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java --- @@ -308,7 +322,7 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException { // This is where we implement support for the valid type conversions. -if (column.dataType() == DataTypes.LongType || +if (column.dataType() == DataTypes.LongType || column.dataType() == DataTypes.TimestampType || DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readLongs( --- End diff -- Do we have tests where this type is *not* dictionary encoded? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14015][SQL] Support TimestampType in ve...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11882#discussion_r57026815 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java --- @@ -308,7 +322,7 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException { // This is where we implement support for the valid type conversions. -if (column.dataType() == DataTypes.LongType || +if (column.dataType() == DataTypes.LongType || column.dataType() == DataTypes.TimestampType || DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readLongs( --- End diff -- hmm. How does this work? readLongs is expecting to read parquet int64 physical types. How is it able to read this other physical type? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13883][SQL] Parquet Implementation of F...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11709#issuecomment-199577463 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14016][SQL] Support high-precision deci...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11869#issuecomment-199539418 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11791#discussion_r56916760 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -500,3 +601,79 @@ private[storage] class PartiallyUnrolledIterator( iter = null } } + +/** + * A wrapper which allows an open [[OutputStream]] to be redirected to a different sink. + */ +private class RedirectableOutputStream extends OutputStream { + private[this] var os: OutputStream = _ + def setOutputStream(s: OutputStream): Unit = { os = s } + override def write(b: Int): Unit = os.write(b) + override def write(b: Array[Byte]): Unit = os.write(b) + override def write(b: Array[Byte], off: Int, len: Int): Unit = os.write(b, off, len) + override def flush(): Unit = os.flush() + override def close(): Unit = os.close() +} + +/** + * The result of a failed [[MemoryStore.putIteratorAsBytes()]] call. + * + * @param memoryStore the MemoryStore, used for freeing memory. + * @param blockManager the BlockManager, used for deserializing values. + * @param blockId the block id. + * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]]. + * @param redirectableOutputStream an OutputStream which can be redirected to a different sink. + * @param unrollMemory the amount of unroll memory used by the values in `unrolled`. + * @param unrolled a byte buffer containing the partially-serialized values. + * @param rest the rest of the original iterator passed to + * [[MemoryStore.putIteratorAsValues()]]. + */ +private[storage] class PartiallySerializedBlock( +memoryStore: MemoryStore, +blockManager: BlockManager, +blockId: BlockId, +serializationStream: SerializationStream, +redirectableOutputStream: RedirectableOutputStream, +unrollMemory: Long, +unrolled: ChunkedByteBuffer, +rest: Iterator[Any]) { + + /** + * Called to dispose of this block and free its memory. + */ + def discard(): Unit = { --- End diff -- who calls this now? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11791#discussion_r56916037 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1074,7 +1074,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with all the space in the world. This should succeed. -var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) +var putResult = memoryStore.putIteratorAsValues("unroll", smallList.iterator) --- End diff -- Do we need more test cases for putIteratorAsBytes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13990] Automatically pick serializer wh...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11801#issuecomment-199535749 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11791#discussion_r56914928 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -244,13 +244,113 @@ private[spark] class MemoryStore( } } + /** + * Attempt to put the given block in memory store as bytes. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated the estimated size of the stored data. In case of + * failure, return a handle which allows the caller to either finish the serialization + * by spilling to disk or to deserialize the partially-serialized block and reconstruct + * the original input iterator. The caller must either fully consume this result + * iterator or call `discard()` on it in order to free the storage memory consumed by the + * partially-unrolled block. + */ + private[storage] def putIteratorAsBytes( + blockId: BlockId, + values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = { + +require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") + +// Whether there is still enough memory for us to continue unrolling this block +var keepUnrolling = true +// Initial per-task memory to request for unrolling blocks (bytes). +val initialMemoryThreshold = unrollMemoryThreshold +// Keep track of unroll memory used by this particular block / putIterator() operation +var unrollMemoryUsedByThisBlock = 0L +// Underlying buffer for unrolling the block +val redirectableStream = new RedirectableOutputStream +val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt) +redirectableStream.setOutputStream(byteArrayChunkOutputStream) +val serializationStream: SerializationStream = { + val ser = blockManager.defaultSerializer.newInstance() + ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream)) +} + +// Request enough memory to begin unrolling +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + +if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + +s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") +} else { + unrollMemoryUsedByThisBlock += initialMemoryThreshold +} + +def reserveAdditionalMemoryIfNecessary(): Unit = { + if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) { +val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest +} +unrollMemoryUsedByThisBlock += amountToRequest + } +} + +// Unroll this block safely, checking whether we have exceeded our threshold +while (values.hasNext && keepUnrolling) { + serializationStream.writeObject(values.next()) + reserveAdditionalMemoryIfNecessary() +} + +if (keepUnrolling) { + serializationStream.close() + reserveAdditionalMemoryIfNecessary() +} + +if (keepUnrolling) { + val entry = SerializedMemoryEntry( +new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))) + // Synchronize so that transfer is atomic + memoryManager.synchronized { --- End diff -- I see this replicated in a few places (transferUnrollToStorage()). It might be easier to have releaseUnrollMemoryForThisTask take another argument to optionally transfer it to storage --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-u
[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11791#discussion_r56914517 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -244,13 +244,113 @@ private[spark] class MemoryStore( } } + /** + * Attempt to put the given block in memory store as bytes. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated the estimated size of the stored data. In case of + * failure, return a handle which allows the caller to either finish the serialization + * by spilling to disk or to deserialize the partially-serialized block and reconstruct + * the original input iterator. The caller must either fully consume this result + * iterator or call `discard()` on it in order to free the storage memory consumed by the + * partially-unrolled block. + */ + private[storage] def putIteratorAsBytes( + blockId: BlockId, + values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = { + +require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") + +// Whether there is still enough memory for us to continue unrolling this block +var keepUnrolling = true +// Initial per-task memory to request for unrolling blocks (bytes). +val initialMemoryThreshold = unrollMemoryThreshold +// Keep track of unroll memory used by this particular block / putIterator() operation +var unrollMemoryUsedByThisBlock = 0L +// Underlying buffer for unrolling the block +val redirectableStream = new RedirectableOutputStream +val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt) +redirectableStream.setOutputStream(byteArrayChunkOutputStream) +val serializationStream: SerializationStream = { + val ser = blockManager.defaultSerializer.newInstance() + ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream)) +} + +// Request enough memory to begin unrolling +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + +if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + +s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") +} else { + unrollMemoryUsedByThisBlock += initialMemoryThreshold +} + +def reserveAdditionalMemoryIfNecessary(): Unit = { + if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) { +val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest +} +unrollMemoryUsedByThisBlock += amountToRequest + } +} + +// Unroll this block safely, checking whether we have exceeded our threshold +while (values.hasNext && keepUnrolling) { + serializationStream.writeObject(values.next()) + reserveAdditionalMemoryIfNecessary() +} + +if (keepUnrolling) { --- End diff -- move this into line 317? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11791#discussion_r56914342 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -244,13 +244,113 @@ private[spark] class MemoryStore( } } + /** + * Attempt to put the given block in memory store as bytes. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated the estimated size of the stored data. In case of + * failure, return a handle which allows the caller to either finish the serialization + * by spilling to disk or to deserialize the partially-serialized block and reconstruct + * the original input iterator. The caller must either fully consume this result + * iterator or call `discard()` on it in order to free the storage memory consumed by the + * partially-unrolled block. + */ + private[storage] def putIteratorAsBytes( + blockId: BlockId, + values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = { + +require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") + +// Whether there is still enough memory for us to continue unrolling this block +var keepUnrolling = true +// Initial per-task memory to request for unrolling blocks (bytes). +val initialMemoryThreshold = unrollMemoryThreshold +// Keep track of unroll memory used by this particular block / putIterator() operation +var unrollMemoryUsedByThisBlock = 0L +// Underlying buffer for unrolling the block +val redirectableStream = new RedirectableOutputStream +val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt) +redirectableStream.setOutputStream(byteArrayChunkOutputStream) +val serializationStream: SerializationStream = { + val ser = blockManager.defaultSerializer.newInstance() + ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream)) +} + +// Request enough memory to begin unrolling +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + +if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + +s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") +} else { + unrollMemoryUsedByThisBlock += initialMemoryThreshold +} + +def reserveAdditionalMemoryIfNecessary(): Unit = { + if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) { +val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock +keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) +if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest +} +unrollMemoryUsedByThisBlock += amountToRequest --- End diff -- i dont understand why you add this twice in some cases --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13990] Automatically pick serializer wh...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11801#discussion_r56913076 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -703,13 +710,13 @@ private[spark] class BlockManager( * * @return true if the block was stored or false if an error occurred. */ - def putBytes( + def putBytes[T: ClassTag]( // TODO(josh) --- End diff -- is this adddressed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13325][SQL] Create a 64-bit hashcode ex...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11209#discussion_r56907012 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.Random + +import org.apache.spark.sql.catalyst.expressions.XXH64 +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.hash.Murmur3_x86_32 +import org.apache.spark.util.Benchmark + +/** + * Synthetic benchmark for MurMurHash 3 and xxHash64. + */ +object HashByteArrayBenchmark { + def test(length: Int, seed: Long, numArrays: Int, iters: Int): Unit = { +val random = new Random(seed) +val arrays = Array.fill[Array[Byte]](numArrays) { + val bytes = new Array[Byte](length) + random.nextBytes(bytes) + bytes +} + +val benchmark = new Benchmark("Hash byte arrays with length " + length, iters * numArrays) +benchmark.addCase("Murmur3_x86_32") { _: Int => + for (_ <- 0L until iters) { +var sum = 0 +var i = 0 +while (i < numArrays) { + sum += Murmur3_x86_32.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42) + i += 1 +} + } +} + +benchmark.addCase("xxHash 64-bit") { _: Int => + for (_ <- 0L until iters) { +var sum = 0L +var i = 0 +while (i < numArrays) { + sum += XXH64.hashUnsafeBytes(arrays(i), Platform.BYTE_ARRAY_OFFSET, length, 42) + i += 1 +} + } +} + +benchmark.run() + } + + def main(args: Array[String]): Unit = { +// Add 31 to all arrays to create worse case alignment for xxHash. +/* +Running benchmark: Hash byte arrays with length 31 --- End diff -- Can you add a case for 8 bytes? I'm curious if we should use this for joins and aggs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13325][SQL] Create a 64-bit hashcode ex...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11209#discussion_r56906895 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala --- @@ -423,6 +345,121 @@ case class Murmur3Hash(children: Seq[Expression], seed: Int) extends Expression case udt: UserDefinedType[_] => computeHash(input, udt.sqlType, result, ctx) } } + + protected def hasherClassName: String +} + +/** + * Base class for interpreted hash functions. + */ +abstract class InterpretedHashFunction[E] { --- End diff -- Should we just change this to always reutrn long and downcast where we currently use int? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13916][SQL] Add a metric to WholeStageC...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11741#discussion_r56905886 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala --- @@ -300,6 +304,10 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override private[sql] lazy val metrics = Map( --- End diff -- Doesn't matter. That would be similar to have two projects in a single stage. The metrics are per instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13805][SQL] Generate code that get a va...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11636#issuecomment-199318644 Have you rerun the benchmark with these changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13674][SQL] Add wholestage codegen supp...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11517#discussion_r56830819 --- Diff: core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala --- @@ -107,6 +113,28 @@ class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = fals override def setSeed(seed: Long): Unit = rng.setSeed(seed) + override def sample(): Int = { --- End diff -- Can we not have the code duplication and write the iterator version based on this function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13873] [SQL] Avoid copy of UnsafeRow wh...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11740#issuecomment-197610722 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13871][SQL] Support for inferring filte...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11665#issuecomment-197548312 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13921] Store serialized blocks as multi...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11748#discussion_r56576152 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.io + +import java.io.InputStream +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import com.google.common.primitives.UnsignedBytes +import io.netty.buffer.{ByteBuf, Unpooled} + +import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.storage.StorageUtils + +/** + * Read-only byte buffer which is physically stored as multiple chunks rather than a single + * contiguous array. + * + * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have + * position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer, + * so if these buffers may also be used elsewhere then the caller is responsible for + * copying them as needed. + */ +private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { + require(chunks != null, "chunks must not be null") + require(chunks.forall(_.limit() > 0), "chunks must be non-empty") + require(chunks.forall(_.position() == 0), "chunks' positions must be 0") + + /** + * This size of this buffer, in bytes. + */ + val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum + + def this(byteBuffer: ByteBuffer) = { +this(Array(byteBuffer)) + } + + /** + * Write this buffer to a channel. + */ + def writeFully(channel: WritableByteChannel): Unit = { +for (bytes <- getChunks()) { + while (bytes.remaining > 0) { +channel.write(bytes) + } +} + } + + /** + * Wrap this buffer to view it as a Netty ByteBuf. + */ + def toNetty: ByteBuf = { +Unpooled.wrappedBuffer(getChunks(): _*) + } + + /** + * Copy this buffer into a new byte array. + * + * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size. + */ + def toArray: Array[Byte] = { +if (size >= Integer.MAX_VALUE) { + throw new UnsupportedOperationException( +s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size") +} +val byteChannel = new ByteArrayWritableChannel(size.toInt) +writeFully(byteChannel) +byteChannel.close() +byteChannel.getData + } + + /** + * Copy this buffer into a new ByteBuffer. + * + * @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size. + */ + def toByteBuffer: ByteBuffer = { +if (chunks.length == 1) { + chunks.head.duplicate() +} else { + ByteBuffer.wrap(toArray) +} + } + + /** + * Creates an input stream to read data from this ChunkedByteBuffer. + * + * @param dispose if true, [[dispose()]] will be called at the end of the stream + *in order to close any memory-mapped files which back this buffer. + */ + def toInputStream(dispose: Boolean = false): InputStream = { +new ChunkedByteBufferInputStream(this, dispose) + } + + /** + * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer. + */ + def getChunks(): Array[ByteBuffer] = { +chunks.map(_.duplicate()) + } + + /** + * Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers. + * The new buffer will share no resources with the original buffer. + */ + def copy(): ChunkedByteBuffer = { +val copiedChunks = getChunks().map { chunk => +
[GitHub] spark pull request: [SPARK-13922][SQL] Filter rows with null attri...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11749#discussion_r56416653 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -345,15 +358,25 @@ public void setColumn(int ordinal, ColumnVector column) { * in this batch will not include this row. */ public final void markFiltered(int rowId) { -assert(filteredRows[rowId] == false); +assert(!filteredRows[rowId]); filteredRows[rowId] = true; ++numRowsFiltered; } + /** + * Marks a given column as non-nullable. Any row that has a NULL value for the corresponding + * attribute is filtered out. + */ + public final void filterNullsInColumn(int ordinal) { +assert(!nullFilteredColumns.contains(ordinal)); --- End diff -- I don't think this assert is necessary. I think this is perfectly valid and makes this api easier to use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13926] Automatically use Kryo serialize...
Github user nongli commented on the pull request: https://github.com/apache/spark/pull/11755#issuecomment-197552094 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13921] Store serialized blocks as multi...
Github user nongli commented on a diff in the pull request: https://github.com/apache/spark/pull/11748#discussion_r56576372 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.io + +import java.io.InputStream +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import com.google.common.primitives.UnsignedBytes +import io.netty.buffer.{ByteBuf, Unpooled} + +import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.storage.StorageUtils + +/** + * Read-only byte buffer which is physically stored as multiple chunks rather than a single + * contiguous array. + * + * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have + * position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer, + * so if these buffers may also be used elsewhere then the caller is responsible for + * copying them as needed. + */ +private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { + require(chunks != null, "chunks must not be null") + require(chunks.forall(_.limit() > 0), "chunks must be non-empty") + require(chunks.forall(_.position() == 0), "chunks' positions must be 0") + + /** + * This size of this buffer, in bytes. + */ + val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum + + def this(byteBuffer: ByteBuffer) = { +this(Array(byteBuffer)) + } + + /** + * Write this buffer to a channel. + */ + def writeFully(channel: WritableByteChannel): Unit = { +for (bytes <- getChunks()) { + while (bytes.remaining > 0) { +channel.write(bytes) + } +} + } + + /** + * Wrap this buffer to view it as a Netty ByteBuf. + */ + def toNetty: ByteBuf = { +Unpooled.wrappedBuffer(getChunks(): _*) + } + + /** + * Copy this buffer into a new byte array. + * + * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size. + */ + def toArray: Array[Byte] = { +if (size >= Integer.MAX_VALUE) { + throw new UnsupportedOperationException( +s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size") +} +val byteChannel = new ByteArrayWritableChannel(size.toInt) +writeFully(byteChannel) +byteChannel.close() +byteChannel.getData + } + + /** + * Copy this buffer into a new ByteBuffer. + * + * @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size. + */ + def toByteBuffer: ByteBuffer = { +if (chunks.length == 1) { + chunks.head.duplicate() +} else { + ByteBuffer.wrap(toArray) +} + } + + /** + * Creates an input stream to read data from this ChunkedByteBuffer. + * + * @param dispose if true, [[dispose()]] will be called at the end of the stream + *in order to close any memory-mapped files which back this buffer. + */ + def toInputStream(dispose: Boolean = false): InputStream = { +new ChunkedByteBufferInputStream(this, dispose) + } + + /** + * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer. + */ + def getChunks(): Array[ByteBuffer] = { +chunks.map(_.duplicate()) + } + + /** + * Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers. + * The new buffer will share no resources with the original buffer. + */ + def copy(): ChunkedByteBuffer = { +val copiedChunks = getChunks().map { chunk => +