spark git commit: [SPARK-22203][SQL] Add job description for file listing Spark jobs

2017-10-04 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 969ffd631 -> c8affec21


[SPARK-22203][SQL] Add job description for file listing Spark jobs

## What changes were proposed in this pull request?

The user may be confused about some 1-tasks jobs. We can add a job 
description for these jobs so that the user can figure it out.

## How was this patch tested?

The new unit test.

Before:
https://user-images.githubusercontent.com/1000778/31202567-f78d15c0-a917-11e7-841e-11b8bf8f0032.png;>

After:
https://user-images.githubusercontent.com/1000778/31202576-fc01e356-a917-11e7-9c2b-7bf80b153adb.png;>

Author: Shixiong Zhu 

Closes #19432 from zsxwing/SPARK-22203.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8affec2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8affec2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8affec2

Branch: refs/heads/master
Commit: c8affec21c91d638009524955515fc143ad86f20
Parents: 969ffd6
Author: Shixiong Zhu 
Authored: Wed Oct 4 20:58:48 2017 -0700
Committer: Shixiong Zhu 
Committed: Wed Oct 4 20:58:48 2017 -0700

--
 .../datasources/InMemoryFileIndex.scala | 85 
 .../sql/test/DataFrameReaderWriterSuite.scala   | 31 +++
 2 files changed, 81 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c8affec2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 203d449..318ada0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
@@ -187,42 +188,56 @@ object InMemoryFileIndex extends Logging {
 // in case of large #defaultParallelism.
 val numParallelism = Math.min(paths.size, 
parallelPartitionDiscoveryParallelism)
 
-val statusMap = sparkContext
-  .parallelize(serializedPaths, numParallelism)
-  .mapPartitions { pathStrings =>
-val hadoopConf = serializableConfiguration.value
-pathStrings.map(new Path(_)).toSeq.map { path =>
-  (path, listLeafFiles(path, hadoopConf, filter, None))
-}.iterator
-  }.map { case (path, statuses) =>
-  val serializableStatuses = statuses.map { status =>
-// Turn FileStatus into SerializableFileStatus so we can send it back 
to the driver
-val blockLocations = status match {
-  case f: LocatedFileStatus =>
-f.getBlockLocations.map { loc =>
-  SerializableBlockLocation(
-loc.getNames,
-loc.getHosts,
-loc.getOffset,
-loc.getLength)
-}
-
-  case _ =>
-Array.empty[SerializableBlockLocation]
-}
-
-SerializableFileStatus(
-  status.getPath.toString,
-  status.getLen,
-  status.isDirectory,
-  status.getReplication,
-  status.getBlockSize,
-  status.getModificationTime,
-  status.getAccessTime,
-  blockLocations)
+val previousJobDescription = 
sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+val statusMap = try {
+  val description = paths.size match {
+case 0 =>
+  s"Listing leaf files and directories 0 paths"
+case 1 =>
+  s"Listing leaf files and directories for 1 path:${paths(0)}"
+case s =>
+  s"Listing leaf files and directories for $s paths:${paths(0)}, 
..."
   }
-  (path.toString, serializableStatuses)
-}.collect()
+  sparkContext.setJobDescription(description)
+  sparkContext
+.parallelize(serializedPaths, numParallelism)
+.mapPartitions { pathStrings =>
+  val hadoopConf = serializableConfiguration.value
+  pathStrings.map(new Path(_)).toSeq.map { path =>
+(path, listLeafFiles(path, hadoopConf, filter, None))
+  }.iterator
+}.map { case (path, statuses) =>
+val serializableStatuses = statuses.map { status =>
+  // Turn FileStatus into SerializableFileStatus so we can 

spark git commit: [SPARK-22187][SS] Update unsaferow format for saved state such that we can set timeouts when state is null

2017-10-04 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master bb035f1ee -> 969ffd631


[SPARK-22187][SS] Update unsaferow format for saved state such that we can set 
timeouts when state is null

## What changes were proposed in this pull request?

Currently, the group state of user-defined-type is encoded as top-level columns 
in the UnsafeRows stores in the state store. The timeout timestamp is also 
saved as (when needed) as the last top-level column. Since the group state is 
serialized to top-level columns, you cannot save "null" as a value of state 
(setting null in all the top-level columns is not equivalent). So we don't let 
the user set the timeout without initializing the state for a key. Based on 
user experience, this leads to confusion.

This PR is to change the row format such that the state is saved as nested 
columns. This would allow the state to be set to null, and avoid these 
confusing corner cases.

## How was this patch tested?
Refactored tests.

Author: Tathagata Das 

Closes #19416 from tdas/SPARK-22187.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/969ffd63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/969ffd63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/969ffd63

Branch: refs/heads/master
Commit: 969ffd631746125eb2b83722baf6f6e7ddd2092c
Parents: bb035f1
Author: Tathagata Das 
Authored: Wed Oct 4 19:25:22 2017 -0700
Committer: Tathagata Das 
Committed: Wed Oct 4 19:25:22 2017 -0700

--
 .../streaming/FlatMapGroupsWithStateExec.scala  | 133 +++-
 .../FlatMapGroupsWithState_StateManager.scala   | 153 +++
 .../streaming/FlatMapGroupsWithStateSuite.scala | 130 
 3 files changed, 246 insertions(+), 170 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/969ffd63/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index ab690fd..aab06d6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -23,10 +23,8 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, 
Attribute, Attribut
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
-import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.util.CompletionIterator
 
 /**
@@ -62,26 +60,7 @@ case class FlatMapGroupsWithStateExec(
   import GroupStateImpl._
 
   private val isTimeoutEnabled = timeoutConf != NoTimeout
-  private val timestampTimeoutAttribute =
-AttributeReference("timeoutTimestamp", dataType = IntegerType, nullable = 
false)()
-  private val stateAttributes: Seq[Attribute] = {
-val encSchemaAttribs = stateEncoder.schema.toAttributes
-if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else 
encSchemaAttribs
-  }
-  // Get the serializer for the state, taking into account whether we need to 
save timestamps
-  private val stateSerializer = {
-val encoderSerializer = stateEncoder.namedExpressions
-if (isTimeoutEnabled) {
-  encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
-} else {
-  encoderSerializer
-}
-  }
-  // Get the deserializer for the state. Note that this must be done in the 
driver, as
-  // resolving and binding of deserializer expressions to the encoded type can 
be safely done
-  // only in the driver.
-  private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
-
+  val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, 
isTimeoutEnabled)
 
   /** Distribute by grouping attributes */
   override def requiredChildDistribution: Seq[Distribution] =
@@ -109,11 +88,11 @@ case class FlatMapGroupsWithStateExec(
 child.execute().mapPartitionsWithStateStore[InternalRow](
   getStateInfo,
   groupingAttributes.toStructType,
-  stateAttributes.toStructType,
+  stateManager.stateSchema,
   indexOrdinal = None,
   sqlContext.sessionState,
   Some(sqlContext.streams.stateStoreCoordinator)) { 

spark git commit: [SPARK-22169][SQL] support byte length literal as identifier

2017-10-04 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 4a779bdac -> bb035f1ee


[SPARK-22169][SQL] support byte length literal as identifier

## What changes were proposed in this pull request?

By definition the table name in Spark can be something like `123x`, `25a`, 
etc., with exceptions for literals like `12L`, `23BD`, etc. However, Spark SQL 
has a special byte length literal, which stops users to use digits followed by 
`b`, `k`, `m`, `g` as identifiers.

byte length literal is not a standard sql literal and is only used in the 
`tableSample` parser rule. This PR move the parsing of byte length literal from 
lexer to parser, so that users can use it as identifiers.

## How was this patch tested?

regression test

Author: Wenchen Fan 

Closes #19392 from cloud-fan/parser-bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb035f1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb035f1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb035f1e

Branch: refs/heads/master
Commit: bb035f1ee5cdf88e476b7ed83d59140d669fbe12
Parents: 4a779bd
Author: Wenchen Fan 
Authored: Wed Oct 4 13:13:51 2017 -0700
Committer: gatorsmile 
Committed: Wed Oct 4 13:13:51 2017 -0700

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 25 ---
 .../sql/catalyst/catalog/SessionCatalog.scala   |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  | 26 ++--
 .../sql/catalyst/parser/PlanParserSuite.scala   |  1 +
 .../sql/execution/command/DDLParserSuite.scala  | 19 ++
 5 files changed, 49 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bb035f1e/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index d0a5428..17c8404 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -25,7 +25,7 @@ grammar SqlBase;
* For char stream "2.3", "2." is not a valid decimal token, because it is 
followed by digit '3'.
* For char stream "2.3_", "2.3" is not a valid decimal token, because it is 
followed by '_'.
* For char stream "2.3W", "2.3" is not a valid decimal token, because it is 
followed by 'W'.
-   * For char stream "12.0D 34.E2+0.12 "  12.0D is a valid decimal token 
because it is folllowed
+   * For char stream "12.0D 34.E2+0.12 "  12.0D is a valid decimal token 
because it is followed
* by a space. 34.E2 is a valid decimal token because it is followed by 
symbol '+'
* which is not a digit or letter or underscore.
*/
@@ -40,10 +40,6 @@ grammar SqlBase;
   }
 }
 
-tokens {
-DELIMITER
-}
-
 singleStatement
 : statement EOF
 ;
@@ -447,12 +443,15 @@ joinCriteria
 ;
 
 sample
-: TABLESAMPLE '('
-  ( (negativeSign=MINUS? percentage=(INTEGER_VALUE | DECIMAL_VALUE) 
sampleType=PERCENTLIT)
-  | (expression sampleType=ROWS)
-  | sampleType=BYTELENGTH_LITERAL
-  | (sampleType=BUCKET numerator=INTEGER_VALUE OUT OF 
denominator=INTEGER_VALUE (ON (identifier | qualifiedName '(' ')'))?))
-  ')'
+: TABLESAMPLE '(' sampleMethod? ')'
+;
+
+sampleMethod
+: negativeSign=MINUS? percentage=(INTEGER_VALUE | DECIMAL_VALUE) 
PERCENTLIT   #sampleByPercentile
+| expression ROWS  
   #sampleByRows
+| sampleType=BUCKET numerator=INTEGER_VALUE OUT OF 
denominator=INTEGER_VALUE
+(ON (identifier | qualifiedName '(' ')'))? 
   #sampleByBucket
+| bytes=expression 
   #sampleByBytes
 ;
 
 identifierList
@@ -1004,10 +1003,6 @@ TINYINT_LITERAL
 : DIGIT+ 'Y'
 ;
 
-BYTELENGTH_LITERAL
-: DIGIT+ ('B' | 'K' | 'M' | 'G')
-;
-
 INTEGER_VALUE
 : DIGIT+
 ;

http://git-wip-us.apache.org/repos/asf/spark/blob/bb035f1e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 6ba9ee5..95bc3d6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 

spark git commit: [SPARK-21871][SQL] Check actual bytecode size when compiling generated code

2017-10-04 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 64df08b64 -> 4a779bdac


[SPARK-21871][SQL] Check actual bytecode size when compiling generated code

## What changes were proposed in this pull request?
This pr added code to check actual bytecode size when compiling generated code. 
In #18810, we added code to give up code compilation and use interpreter 
execution in `SparkPlan` if the line number of generated functions goes over 
`maxLinesPerFunction`. But, we already have code to collect metrics for 
compiled bytecode size in `CodeGenerator` object. So,we could easily reuse the 
code for this purpose.

## How was this patch tested?
Added tests in `WholeStageCodegenSuite`.

Author: Takeshi Yamamuro 

Closes #19083 from maropu/SPARK-21871.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a779bda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a779bda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a779bda

Branch: refs/heads/master
Commit: 4a779bdac3e75c17b7d36c5a009ba6c948fa9fb6
Parents: 64df08b
Author: Takeshi Yamamuro 
Authored: Wed Oct 4 10:08:24 2017 -0700
Committer: gatorsmile 
Committed: Wed Oct 4 10:08:24 2017 -0700

--
 .../expressions/codegen/CodeFormatter.scala |  8 ---
 .../expressions/codegen/CodeGenerator.scala | 59 ++--
 .../codegen/GenerateMutableProjection.scala |  4 +-
 .../expressions/codegen/GenerateOrdering.scala  |  3 +-
 .../expressions/codegen/GeneratePredicate.scala |  3 +-
 .../codegen/GenerateSafeProjection.scala|  4 +-
 .../codegen/GenerateUnsafeProjection.scala  |  4 +-
 .../codegen/GenerateUnsafeRowJoiner.scala   |  4 +-
 .../org/apache/spark/sql/internal/SQLConf.scala | 15 ++---
 .../codegen/CodeFormatterSuite.scala| 32 ---
 .../sql/execution/WholeStageCodegenExec.scala   | 25 +
 .../columnar/GenerateColumnAccessor.scala   |  3 +-
 .../sql/execution/WholeStageCodegenSuite.scala  | 43 --
 .../benchmark/AggregateBenchmark.scala  | 36 ++--
 14 files changed, 94 insertions(+), 149 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
index 7b398f4..60e600d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
@@ -89,14 +89,6 @@ object CodeFormatter {
 }
 new CodeAndComment(code.result().trim(), map)
   }
-
-  def stripExtraNewLinesAndComments(input: String): String = {
-val commentReg =
-  ("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" +// strip /*comment*/
-   """([ |\t]*?\/\/[\s\S]*?\n)""").r   // strip //comment
-val codeWithoutComment = commentReg.replaceAllIn(input, "")
-codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines
-  }
 }
 
 private class CodeFormatter {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index f3b4579..f9c5ef8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -374,20 +374,6 @@ class CodegenContext {
   private val placeHolderToComments = new mutable.HashMap[String, String]
 
   /**
-   * It will count the lines of every Java function generated by whole-stage 
codegen,
-   * if there is a function of length greater than 
spark.sql.codegen.maxLinesPerFunction,
-   * it will return true.
-   */
-  def isTooLongGeneratedFunction: Boolean = {
-classFunctions.values.exists { _.values.exists {
-  code =>
-val codeWithoutComments = 
CodeFormatter.stripExtraNewLinesAndComments(code)
-codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction
-  }
-}
-  }
-
-  /**
* Returns a term name that is unique within this instance of a 
`CodegenContext`.
  

spark git commit: [SPARK-20783][SQL] Create ColumnVector to abstract existing compressed column (batch method)

2017-10-04 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master d54670192 -> 64df08b64


[SPARK-20783][SQL] Create ColumnVector to abstract existing compressed column 
(batch method)

## What changes were proposed in this pull request?

This PR abstracts data compressed by `CompressibleColumnAccessor` using 
`ColumnVector` in batch method. When `ColumnAccessor.decompress` is called, 
`ColumnVector` will have uncompressed data. This batch decompress does not use 
`InternalRow` to reduce the number of memory accesses.

As first step of this implementation, this JIRA supports primitive data types. 
Another PR will support array and other data types.

This implementation decompress data in batch into uncompressed column batch, as 
rxin suggested at 
[here](https://github.com/apache/spark/pull/18468#issuecomment-316914076). 
Another implementation uses adapter approach [as cloud-fan 
suggested](https://github.com/apache/spark/pull/18468).

## How was this patch tested?

Added test suites

Author: Kazuaki Ishizaki 

Closes #18704 from kiszk/SPARK-20783a.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64df08b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64df08b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64df08b6

Branch: refs/heads/master
Commit: 64df08b64779bab629a8a90a3797d8bd70f61703
Parents: d546701
Author: Kazuaki Ishizaki 
Authored: Wed Oct 4 15:06:44 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Oct 4 15:06:44 2017 +0800

--
 .../execution/columnar/ColumnDictionary.java|  58 
 .../vectorized/OffHeapColumnVector.java |  18 +
 .../vectorized/OnHeapColumnVector.java  |  18 +
 .../vectorized/WritableColumnVector.java|  76 +++--
 .../sql/execution/columnar/ColumnAccessor.scala |  16 +-
 .../sql/execution/columnar/ColumnType.scala |  33 ++
 .../CompressibleColumnAccessor.scala|   4 +
 .../compression/CompressionScheme.scala |   3 +
 .../compression/compressionSchemes.scala| 340 ++-
 .../compression/BooleanBitSetSuite.scala|  52 +++
 .../compression/DictionaryEncodingSuite.scala   |  72 +++-
 .../compression/IntegralDeltaSuite.scala|  72 
 .../compression/PassThroughEncodingSuite.scala  | 189 +++
 .../compression/RunLengthEncodingSuite.scala|  89 -
 .../TestCompressibleColumnBuilder.scala |   9 +-
 .../vectorized/ColumnVectorSuite.scala  | 183 +-
 .../vectorized/ColumnarBatchSuite.scala |   4 +-
 17 files changed, 1192 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/64df08b6/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
new file mode 100644
index 000..f178585
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar;
+
+import org.apache.spark.sql.execution.vectorized.Dictionary;
+
+public final class ColumnDictionary implements Dictionary {
+  private int[] intDictionary;
+  private long[] longDictionary;
+
+  public ColumnDictionary(int[] dictionary) {
+this.intDictionary = dictionary;
+  }
+
+  public ColumnDictionary(long[] dictionary) {
+this.longDictionary = dictionary;
+  }
+
+  @Override
+  public int decodeToInt(int id) {
+return intDictionary[id];
+  }
+
+  @Override
+  public long decodeToLong(int id) {
+return longDictionary[id];
+  }
+
+  @Override
+  public float decodeToFloat(int id) {
+throw new UnsupportedOperationException("Dictionary encoding does not 
support float");
+  }
+
+  @Override

spark git commit: [SPARK-22193][SQL] Minor typo fix

2017-10-04 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 3099c574c -> d54670192


[SPARK-22193][SQL] Minor typo fix

## What changes were proposed in this pull request?

[SPARK-22193][SQL] Minor typo fix in SortMergeJoinExec. Nothing major, but it 
bothered me going into.Hence fixing

## How was this patch tested?
existing tests

Author: Rekha Joshi 
Author: rjoshi2 

Closes #19422 from rekhajoshm/SPARK-22193.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5467019
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5467019
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5467019

Branch: refs/heads/master
Commit: d54670192a6acd892d13b511dfb62390be6ad39c
Parents: 3099c57
Author: Rekha Joshi 
Authored: Wed Oct 4 07:11:00 2017 +0100
Committer: Sean Owen 
Committed: Wed Oct 4 07:11:00 2017 +0100

--
 .../apache/spark/sql/execution/joins/SortMergeJoinExec.scala   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d5467019/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 14de2dc..4e02803 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -402,7 +402,7 @@ case class SortMergeJoinExec(
 }
   }
 
-  private def genComparision(ctx: CodegenContext, a: Seq[ExprCode], b: 
Seq[ExprCode]): String = {
+  private def genComparison(ctx: CodegenContext, a: Seq[ExprCode], b: 
Seq[ExprCode]): String = {
 val comparisons = a.zip(b).zipWithIndex.map { case ((l, r), i) =>
   s"""
  |if (comp == 0) {
@@ -463,7 +463,7 @@ case class SortMergeJoinExec(
  |  continue;
  |}
  |if (!$matches.isEmpty()) {
- |  ${genComparision(ctx, leftKeyVars, matchedKeyVars)}
+ |  ${genComparison(ctx, leftKeyVars, matchedKeyVars)}
  |  if (comp == 0) {
  |return true;
  |  }
@@ -484,7 +484,7 @@ case class SortMergeJoinExec(
  |}
  |${rightKeyVars.map(_.code).mkString("\n")}
  |  }
- |  ${genComparision(ctx, leftKeyVars, rightKeyVars)}
+ |  ${genComparison(ctx, leftKeyVars, rightKeyVars)}
  |  if (comp > 0) {
  |$rightRow = null;
  |  } else if (comp < 0) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org