spark git commit: [SPARK-20909][SQL] Add build-int SQL function - DAYOFWEEK

2017-05-29 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master 96a4d1d08 -> d797ed0ef


[SPARK-20909][SQL] Add build-int SQL function - DAYOFWEEK

## What changes were proposed in this pull request?

Add build-int SQL function - DAYOFWEEK

## How was this patch tested?

unit tests

Author: Yuming Wang 

Closes #18134 from wangyum/SPARK-20909.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d797ed0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d797ed0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d797ed0e

Branch: refs/heads/master
Commit: d797ed0ef10f3e2e4cade3fc47071839ae8c5fd4
Parents: 96a4d1d
Author: Yuming Wang 
Authored: Tue May 30 15:40:50 2017 +0900
Committer: Takuya UESHIN 
Committed: Tue May 30 15:40:50 2017 +0900

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../expressions/datetimeExpressions.scala   | 38 
 .../expressions/DateExpressionsSuite.scala  | 14 
 .../resources/sql-tests/inputs/datetime.sql |  2 ++
 .../sql-tests/results/datetime.sql.out  | 10 +-
 5 files changed, 64 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d797ed0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 549fa0d..8081036 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -360,6 +360,7 @@ object FunctionRegistry {
 expression[ToUTCTimestamp]("to_utc_timestamp"),
 expression[TruncDate]("trunc"),
 expression[UnixTimestamp]("unix_timestamp"),
+expression[DayOfWeek]("dayofweek"),
 expression[WeekOfYear]("weekofyear"),
 expression[Year]("year"),
 expression[TimeWindow]("window"),

http://git-wip-us.apache.org/repos/asf/spark/blob/d797ed0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 4098300..505ed94 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -404,6 +404,44 @@ case class DayOfMonth(child: Expression) extends 
UnaryExpression with ImplicitCa
 
 // scalastyle:off line.size.limit
 @ExpressionDescription(
+  usage = "_FUNC_(date) - Returns the day of the week for date/timestamp (1 = 
Sunday, 2 = Monday, ..., 7 = Saturday).",
+  extended = """
+Examples:
+  > SELECT _FUNC_('2009-07-30');
+   5
+  """)
+// scalastyle:on line.size.limit
+case class DayOfWeek(child: Expression) extends UnaryExpression with 
ImplicitCastInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(DateType)
+
+  override def dataType: DataType = IntegerType
+
+  @transient private lazy val c = {
+Calendar.getInstance(DateTimeUtils.getTimeZone("UTC"))
+  }
+
+  override protected def nullSafeEval(date: Any): Any = {
+c.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L)
+c.get(Calendar.DAY_OF_WEEK)
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, time => {
+  val cal = classOf[Calendar].getName
+  val c = ctx.freshName("cal")
+  val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+  ctx.addMutableState(cal, c, s"""$c = 
$cal.getInstance($dtu.getTimeZone("UTC"));""")
+  s"""
+$c.setTimeInMillis($time * 1000L * 3600L * 24L);
+${ev.value} = $c.get($cal.DAY_OF_WEEK);
+  """
+})
+  }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
   usage = "_FUNC_(date) - Returns the week of the year of the given date. A 
week is considered to start on a Monday and week 1 is the first week with >3 
days.",
   extended = """
 Examples:

http://git-wip-us.apache.org/repos/asf/spark/blob/d797ed0e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
 
b/sql/cat

spark git commit: [SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating one every batch.

2017-05-29 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3b79e4cda -> f6730a70c


[SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating 
one every batch.

## What changes were proposed in this pull request?

In summary, cost of recreating a KafkaProducer for writing every batch is high 
as it starts a lot threads and make connections and then closes them. A 
KafkaProducer instance is promised to be thread safe in Kafka docs. Reuse of 
KafkaProducer instance while writing via multiple threads is encouraged.

Furthermore, I have performance improvement of 10x in latency, with this patch.

### These are times that addBatch took in ms. Without applying this patch
![with-out_patch](https://cloud.githubusercontent.com/assets/992952/23994612/a9de4a42-0a6b-11e7-9d5b-7ae18775bee4.png)
### These are times that addBatch took in ms. After applying this patch
![with_patch](https://cloud.githubusercontent.com/assets/992952/23994616/ad8c11ec-0a6b-11e7-8634-2266ebb5033f.png)

## How was this patch tested?
Running distributed benchmarks comparing runs with this patch and without it.
Added relevant unit tests.

Author: Prashant Sharma 

Closes #17308 from ScrapCodes/cached-kafka-producer.

(cherry picked from commit 96a4d1d0827fc3fba83f174510b061684f0d00f7)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6730a70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6730a70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6730a70

Branch: refs/heads/branch-2.2
Commit: f6730a70cb47ebb3df7f42209df7b076aece1093
Parents: 3b79e4c
Author: Prashant Sharma 
Authored: Mon May 29 18:12:01 2017 -0700
Committer: Shixiong Zhu 
Committed: Mon May 29 18:12:10 2017 -0700

--
 .../sql/kafka010/CachedKafkaProducer.scala  | 112 +++
 .../apache/spark/sql/kafka010/KafkaSource.scala |  14 +--
 .../spark/sql/kafka010/KafkaWriteTask.scala |  17 ++-
 .../apache/spark/sql/kafka010/KafkaWriter.scala |   3 +-
 .../sql/kafka010/CachedKafkaProducerSuite.scala |  78 +
 5 files changed, 206 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6730a70/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
new file mode 100644
index 000..571140b
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit}
+
+import com.google.common.cache._
+import com.google.common.util.concurrent.{ExecutionError, 
UncheckedExecutionException}
+import org.apache.kafka.clients.producer.KafkaProducer
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+
+private[kafka010] object CachedKafkaProducer extends Logging {
+
+  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
+
+  private lazy val cacheExpireTimeout: Long =
+SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
+
+  private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
+override def load(config: Seq[(String, Object)]): Producer = {
+  val configMap = config.map(x => x._1 -> x._2).toMap.asJava
+  createKafkaProducer(configMap)
+}
+  }
+
+  private val removalListener = new RemovalListener[Seq[(String, Object)], 
Producer]() {
+override def onRemoval(
+notification: RemovalNotification[Seq[(String, Object)], Producer]): 
Unit = {
+  val paramsSeq: Seq

spark git commit: [SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating one every batch.

2017-05-29 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 1c7db00c7 -> 96a4d1d08


[SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating 
one every batch.

## What changes were proposed in this pull request?

In summary, cost of recreating a KafkaProducer for writing every batch is high 
as it starts a lot threads and make connections and then closes them. A 
KafkaProducer instance is promised to be thread safe in Kafka docs. Reuse of 
KafkaProducer instance while writing via multiple threads is encouraged.

Furthermore, I have performance improvement of 10x in latency, with this patch.

### These are times that addBatch took in ms. Without applying this patch
![with-out_patch](https://cloud.githubusercontent.com/assets/992952/23994612/a9de4a42-0a6b-11e7-9d5b-7ae18775bee4.png)
### These are times that addBatch took in ms. After applying this patch
![with_patch](https://cloud.githubusercontent.com/assets/992952/23994616/ad8c11ec-0a6b-11e7-8634-2266ebb5033f.png)

## How was this patch tested?
Running distributed benchmarks comparing runs with this patch and without it.
Added relevant unit tests.

Author: Prashant Sharma 

Closes #17308 from ScrapCodes/cached-kafka-producer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96a4d1d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96a4d1d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96a4d1d0

Branch: refs/heads/master
Commit: 96a4d1d0827fc3fba83f174510b061684f0d00f7
Parents: 1c7db00
Author: Prashant Sharma 
Authored: Mon May 29 18:12:01 2017 -0700
Committer: Shixiong Zhu 
Committed: Mon May 29 18:12:01 2017 -0700

--
 .../sql/kafka010/CachedKafkaProducer.scala  | 112 +++
 .../apache/spark/sql/kafka010/KafkaSource.scala |  14 +--
 .../spark/sql/kafka010/KafkaWriteTask.scala |  17 ++-
 .../apache/spark/sql/kafka010/KafkaWriter.scala |   3 +-
 .../sql/kafka010/CachedKafkaProducerSuite.scala |  78 +
 5 files changed, 206 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/96a4d1d0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
new file mode 100644
index 000..571140b
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit}
+
+import com.google.common.cache._
+import com.google.common.util.concurrent.{ExecutionError, 
UncheckedExecutionException}
+import org.apache.kafka.clients.producer.KafkaProducer
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+
+private[kafka010] object CachedKafkaProducer extends Logging {
+
+  private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
+
+  private lazy val cacheExpireTimeout: Long =
+SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
+
+  private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
+override def load(config: Seq[(String, Object)]): Producer = {
+  val configMap = config.map(x => x._1 -> x._2).toMap.asJava
+  createKafkaProducer(configMap)
+}
+  }
+
+  private val removalListener = new RemovalListener[Seq[(String, Object)], 
Producer]() {
+override def onRemoval(
+notification: RemovalNotification[Seq[(String, Object)], Producer]): 
Unit = {
+  val paramsSeq: Seq[(String, Object)] = notification.getKey
+  val producer: Producer = notification.getValue
+  logDe

spark git commit: [SPARK-8184][SQL] Add additional function description for weekofyear

2017-05-29 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 26640a269 -> 3b79e4cda


[SPARK-8184][SQL] Add additional function description for weekofyear

## What changes were proposed in this pull request?

Add additional function description for weekofyear.

## How was this patch tested?

 manual tests

![weekofyear](https://cloud.githubusercontent.com/assets/5399861/26525752/08a1c278-4394-11e7-8988-7cbf82c3a999.gif)

Author: Yuming Wang 

Closes #18132 from wangyum/SPARK-8184.

(cherry picked from commit 1c7db00c74ec6a91c7eefbdba85cbf41fbe8634a)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b79e4cd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b79e4cd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b79e4cd

Branch: refs/heads/branch-2.2
Commit: 3b79e4cda74e0bf82ec55e673beb8f84e7cfaca4
Parents: 26640a2
Author: Yuming Wang 
Authored: Mon May 29 16:10:22 2017 -0700
Committer: Reynold Xin 
Committed: Mon May 29 16:10:29 2017 -0700

--
 .../spark/sql/catalyst/expressions/datetimeExpressions.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3b79e4cd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 6a76058..0ab7207 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -402,13 +402,15 @@ case class DayOfMonth(child: Expression) extends 
UnaryExpression with ImplicitCa
   }
 }
 
+// scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(date) - Returns the week of the year of the given date.",
+  usage = "_FUNC_(date) - Returns the week of the year of the given date. A 
week is considered to start on a Monday and week 1 is the first week with >3 
days.",
   extended = """
 Examples:
   > SELECT _FUNC_('2008-02-20');
8
   """)
+// scalastyle:on line.size.limit
 case class WeekOfYear(child: Expression) extends UnaryExpression with 
ImplicitCastInputTypes {
 
   override def inputTypes: Seq[AbstractDataType] = Seq(DateType)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8184][SQL] Add additional function description for weekofyear

2017-05-29 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master c9749068e -> 1c7db00c7


[SPARK-8184][SQL] Add additional function description for weekofyear

## What changes were proposed in this pull request?

Add additional function description for weekofyear.

## How was this patch tested?

 manual tests

![weekofyear](https://cloud.githubusercontent.com/assets/5399861/26525752/08a1c278-4394-11e7-8988-7cbf82c3a999.gif)

Author: Yuming Wang 

Closes #18132 from wangyum/SPARK-8184.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c7db00c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c7db00c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c7db00c

Branch: refs/heads/master
Commit: 1c7db00c74ec6a91c7eefbdba85cbf41fbe8634a
Parents: c974906
Author: Yuming Wang 
Authored: Mon May 29 16:10:22 2017 -0700
Committer: Reynold Xin 
Committed: Mon May 29 16:10:22 2017 -0700

--
 .../spark/sql/catalyst/expressions/datetimeExpressions.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1c7db00c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 43ca2cf..4098300 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -402,13 +402,15 @@ case class DayOfMonth(child: Expression) extends 
UnaryExpression with ImplicitCa
   }
 }
 
+// scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(date) - Returns the week of the year of the given date.",
+  usage = "_FUNC_(date) - Returns the week of the year of the given date. A 
week is considered to start on a Monday and week 1 is the first week with >3 
days.",
   extended = """
 Examples:
   > SELECT _FUNC_('2008-02-20');
8
   """)
+// scalastyle:on line.size.limit
 case class WeekOfYear(child: Expression) extends UnaryExpression with 
ImplicitCastInputTypes {
 
   override def inputTypes: Seq[AbstractDataType] = Seq(DateType)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20907][TEST] Use testQuietly for test suites that generate long log output

2017-05-29 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 dc51be1e7 -> 26640a269


[SPARK-20907][TEST] Use testQuietly for test suites that generate long log 
output

## What changes were proposed in this pull request?

Supress console output by using `testQuietly` in test suites

## How was this patch tested?

Tested by `"SPARK-19372: Filter can be executed w/o generated code due to JVM 
code size limit"` in `DataFrameSuite`

Author: Kazuaki Ishizaki 

Closes #18135 from kiszk/SPARK-20907.

(cherry picked from commit c9749068ecf8e0acabdfeeceeedff0f1f73293b7)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26640a26
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26640a26
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26640a26

Branch: refs/heads/branch-2.2
Commit: 26640a26984bac4fc1037714e60bd3607929b377
Parents: dc51be1
Author: Kazuaki Ishizaki 
Authored: Mon May 29 12:17:14 2017 -0700
Committer: Shixiong Zhu 
Committed: Mon May 29 12:17:22 2017 -0700

--
 sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/26640a26/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index be49b97..27558a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1733,7 +1733,7 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
   .count
   }
 
-  test("SPARK-19372: Filter can be executed w/o generated code due to JVM code 
size limit") {
+  testQuietly("SPARK-19372: Filter can be executed w/o generated code due to 
JVM code size limit") {
 val N = 400
 val rows = Seq(Row.fromSeq(Seq.fill(N)("string")))
 val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", 
StringType)))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20907][TEST] Use testQuietly for test suites that generate long log output

2017-05-29 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master ef9fd920c -> c9749068e


[SPARK-20907][TEST] Use testQuietly for test suites that generate long log 
output

## What changes were proposed in this pull request?

Supress console output by using `testQuietly` in test suites

## How was this patch tested?

Tested by `"SPARK-19372: Filter can be executed w/o generated code due to JVM 
code size limit"` in `DataFrameSuite`

Author: Kazuaki Ishizaki 

Closes #18135 from kiszk/SPARK-20907.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9749068
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9749068
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9749068

Branch: refs/heads/master
Commit: c9749068ecf8e0acabdfeeceeedff0f1f73293b7
Parents: ef9fd92
Author: Kazuaki Ishizaki 
Authored: Mon May 29 12:17:14 2017 -0700
Committer: Shixiong Zhu 
Committed: Mon May 29 12:17:14 2017 -0700

--
 sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c9749068/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 9f691cb..9ea9951 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1845,7 +1845,7 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
   .count
   }
 
-  test("SPARK-19372: Filter can be executed w/o generated code due to JVM code 
size limit") {
+  testQuietly("SPARK-19372: Filter can be executed w/o generated code due to 
JVM code size limit") {
 val N = 400
 val rows = Seq(Row.fromSeq(Seq.fill(N)("string")))
 val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", 
StringType)))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20750][SQL] Built-in SQL Function Support - REPLACE

2017-05-29 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master f9b59abea -> ef9fd920c


[SPARK-20750][SQL] Built-in SQL Function Support - REPLACE

## What changes were proposed in this pull request?

This PR adds built-in SQL function `(REPLACE(, 
 [, ])`

`REPLACE()` return that string that is replaced all occurrences with given 
string.

## How was this patch tested?

added new test suites

Author: Kazuaki Ishizaki 

Closes #18047 from kiszk/SPARK-20750.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef9fd920
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef9fd920
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef9fd920

Branch: refs/heads/master
Commit: ef9fd920c3241e05915c231bef50e3e51a655ce6
Parents: f9b59ab
Author: Kazuaki Ishizaki 
Authored: Mon May 29 11:47:31 2017 -0700
Committer: Xiao Li 
Committed: Mon May 29 11:47:31 2017 -0700

--
 .../apache/spark/unsafe/types/UTF8String.java   |  9 +
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../expressions/stringExpressions.scala | 42 
 .../expressions/StringExpressionsSuite.scala| 20 ++
 .../sql-tests/inputs/string-functions.sql   |  4 ++
 .../sql-tests/results/string-functions.sql.out  | 18 -
 6 files changed, 93 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ef9fd920/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 5437e99..40b9fc9 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -835,6 +835,15 @@ public final class UTF8String implements 
Comparable, Externalizable,
 return res;
   }
 
+  public UTF8String replace(UTF8String search, UTF8String replace) {
+if (EMPTY_UTF8.equals(search)) {
+  return this;
+}
+String replaced = toString().replace(
+  search.toString(), replace.toString());
+return fromString(replaced);
+  }
+
   // TODO: Need to use `Code Point` here instead of Char in case the character 
longer than 2 bytes
   public UTF8String translate(Map dict) {
 String srcStr = this.toString();

http://git-wip-us.apache.org/repos/asf/spark/blob/ef9fd920/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index a4c7f7a..549fa0d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -304,6 +304,7 @@ object FunctionRegistry {
 expression[RegExpExtract]("regexp_extract"),
 expression[RegExpReplace]("regexp_replace"),
 expression[StringRepeat]("repeat"),
+expression[StringReplace]("replace"),
 expression[StringReverse]("reverse"),
 expression[RLike]("rlike"),
 expression[StringRPad]("rpad"),

http://git-wip-us.apache.org/repos/asf/spark/blob/ef9fd920/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index aba2f5f..1dbe098 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -340,6 +340,48 @@ case class EndsWith(left: Expression, right: Expression) 
extends StringPredicate
   }
 }
 
+/**
+ * Replace all occurrences with string.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(str, search[, replace]) - Replaces all occurrences of 
`search` with `replace`.",
+  extended = """
+Arguments:
+  str - a string expression
+  search - a string expression. If `search` is not found in `str`, `str` 
is returned unchanged.
+  replace - a string expression. If `replace` is not specified or is an 
empty string, nothing replaces
+the string that is removed from `str`.
+
+Examples:
+  > SELECT _FUNC_('ABCabc', 'abc', 'DE

spark git commit: [SPARK-20758][SQL] Add Constant propagation optimization

2017-05-29 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 9d0db5a7f -> f9b59abea


[SPARK-20758][SQL] Add Constant propagation optimization

## What changes were proposed in this pull request?

See class doc of `ConstantPropagation` for the approach used.

## How was this patch tested?

- Added unit tests

Author: Tejas Patil 

Closes #17993 from tejasapatil/SPARK-20758_const_propagation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9b59abe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9b59abe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9b59abe

Branch: refs/heads/master
Commit: f9b59abeae16088c7c4d3a475762ef6c4ad42b4b
Parents: 9d0db5a
Author: Tejas Patil 
Authored: Mon May 29 12:21:34 2017 +0200
Committer: Herman van Hovell 
Committed: Mon May 29 12:21:34 2017 +0200

--
 .../sql/catalyst/optimizer/Optimizer.scala  |   1 +
 .../sql/catalyst/optimizer/expressions.scala|  56 +++
 .../optimizer/ConstantPropagationSuite.scala| 167 +++
 .../datasources/FileSourceStrategySuite.scala   |  18 +-
 4 files changed, 235 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f9b59abe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index ae2f6bf..d16689a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -92,6 +92,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: SQLConf)
   CombineUnions,
   // Constant folding and strength reduction
   NullPropagation(conf),
+  ConstantPropagation,
   FoldablePropagation,
   OptimizeIn(conf),
   ConstantFolding,

http://git-wip-us.apache.org/repos/asf/spark/blob/f9b59abe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 8931eb2..51f749a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -54,6 +54,62 @@ object ConstantFolding extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Substitutes [[Attribute Attributes]] which can be statically evaluated with 
their corresponding
+ * value in conjunctive [[Expression Expressions]]
+ * eg.
+ * {{{
+ *   SELECT * FROM table WHERE i = 5 AND j = i + 3
+ *   ==>  SELECT * FROM table WHERE i = 5 AND j = 8
+ * }}}
+ *
+ * Approach used:
+ * - Start from AND operator as the root
+ * - Get all the children conjunctive predicates which are EqualTo / 
EqualNullSafe such that they
+ *   don't have a `NOT` or `OR` operator in them
+ * - Populate a mapping of attribute => constant value by looking at all the 
equals predicates
+ * - Using this mapping, replace occurrence of the attributes with the 
corresponding constant values
+ *   in the AND node.
+ */
+object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper {
+  private def containsNonConjunctionPredicates(expression: Expression): 
Boolean = expression.find {
+case _: Not | _: Or => true
+case _ => false
+  }.isDefined
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+case f: Filter => f transformExpressionsUp {
+  case and: And =>
+val conjunctivePredicates =
+  splitConjunctivePredicates(and)
+.filter(expr => expr.isInstanceOf[EqualTo] || 
expr.isInstanceOf[EqualNullSafe])
+.filterNot(expr => containsNonConjunctionPredicates(expr))
+
+val equalityPredicates = conjunctivePredicates.collect {
+  case e @ EqualTo(left: AttributeReference, right: Literal) => 
((left, right), e)
+  case e @ EqualTo(left: Literal, right: AttributeReference) => 
((right, left), e)
+  case e @ EqualNullSafe(left: AttributeReference, right: Literal) => 
((left, right), e)
+  case e @ EqualNullSafe(left: Literal, right: AttributeReference) => 
((right, left), e)
+}
+
+val constantsMap = AttributeMap(equalityPredicates.map(_._1))
+val predicates = equalityPredicates.map(_._2).toSet
+
+def replaceConstants(expression: Expression) = expression transform {
+  case a