spark git commit: [SPARK-20909][SQL] Add build-int SQL function - DAYOFWEEK
Repository: spark Updated Branches: refs/heads/master 96a4d1d08 -> d797ed0ef [SPARK-20909][SQL] Add build-int SQL function - DAYOFWEEK ## What changes were proposed in this pull request? Add build-int SQL function - DAYOFWEEK ## How was this patch tested? unit tests Author: Yuming Wang Closes #18134 from wangyum/SPARK-20909. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d797ed0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d797ed0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d797ed0e Branch: refs/heads/master Commit: d797ed0ef10f3e2e4cade3fc47071839ae8c5fd4 Parents: 96a4d1d Author: Yuming Wang Authored: Tue May 30 15:40:50 2017 +0900 Committer: Takuya UESHIN Committed: Tue May 30 15:40:50 2017 +0900 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/datetimeExpressions.scala | 38 .../expressions/DateExpressionsSuite.scala | 14 .../resources/sql-tests/inputs/datetime.sql | 2 ++ .../sql-tests/results/datetime.sql.out | 10 +- 5 files changed, 64 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d797ed0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 549fa0d..8081036 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -360,6 +360,7 @@ object FunctionRegistry { expression[ToUTCTimestamp]("to_utc_timestamp"), expression[TruncDate]("trunc"), expression[UnixTimestamp]("unix_timestamp"), +expression[DayOfWeek]("dayofweek"), expression[WeekOfYear]("weekofyear"), expression[Year]("year"), expression[TimeWindow]("window"), http://git-wip-us.apache.org/repos/asf/spark/blob/d797ed0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 4098300..505ed94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -404,6 +404,44 @@ case class DayOfMonth(child: Expression) extends UnaryExpression with ImplicitCa // scalastyle:off line.size.limit @ExpressionDescription( + usage = "_FUNC_(date) - Returns the day of the week for date/timestamp (1 = Sunday, 2 = Monday, ..., 7 = Saturday).", + extended = """ +Examples: + > SELECT _FUNC_('2009-07-30'); + 5 + """) +// scalastyle:on line.size.limit +case class DayOfWeek(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(DateType) + + override def dataType: DataType = IntegerType + + @transient private lazy val c = { +Calendar.getInstance(DateTimeUtils.getTimeZone("UTC")) + } + + override protected def nullSafeEval(date: Any): Any = { +c.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L) +c.get(Calendar.DAY_OF_WEEK) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, time => { + val cal = classOf[Calendar].getName + val c = ctx.freshName("cal") + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + ctx.addMutableState(cal, c, s"""$c = $cal.getInstance($dtu.getTimeZone("UTC"));""") + s""" +$c.setTimeInMillis($time * 1000L * 3600L * 24L); +${ev.value} = $c.get($cal.DAY_OF_WEEK); + """ +}) + } +} + +// scalastyle:off line.size.limit +@ExpressionDescription( usage = "_FUNC_(date) - Returns the week of the year of the given date. A week is considered to start on a Monday and week 1 is the first week with >3 days.", extended = """ Examples: http://git-wip-us.apache.org/repos/asf/spark/blob/d797ed0e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/cat
spark git commit: [SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating one every batch.
Repository: spark Updated Branches: refs/heads/branch-2.2 3b79e4cda -> f6730a70c [SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating one every batch. ## What changes were proposed in this pull request? In summary, cost of recreating a KafkaProducer for writing every batch is high as it starts a lot threads and make connections and then closes them. A KafkaProducer instance is promised to be thread safe in Kafka docs. Reuse of KafkaProducer instance while writing via multiple threads is encouraged. Furthermore, I have performance improvement of 10x in latency, with this patch. ### These are times that addBatch took in ms. Without applying this patch ![with-out_patch](https://cloud.githubusercontent.com/assets/992952/23994612/a9de4a42-0a6b-11e7-9d5b-7ae18775bee4.png) ### These are times that addBatch took in ms. After applying this patch ![with_patch](https://cloud.githubusercontent.com/assets/992952/23994616/ad8c11ec-0a6b-11e7-8634-2266ebb5033f.png) ## How was this patch tested? Running distributed benchmarks comparing runs with this patch and without it. Added relevant unit tests. Author: Prashant Sharma Closes #17308 from ScrapCodes/cached-kafka-producer. (cherry picked from commit 96a4d1d0827fc3fba83f174510b061684f0d00f7) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6730a70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6730a70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6730a70 Branch: refs/heads/branch-2.2 Commit: f6730a70cb47ebb3df7f42209df7b076aece1093 Parents: 3b79e4c Author: Prashant Sharma Authored: Mon May 29 18:12:01 2017 -0700 Committer: Shixiong Zhu Committed: Mon May 29 18:12:10 2017 -0700 -- .../sql/kafka010/CachedKafkaProducer.scala | 112 +++ .../apache/spark/sql/kafka010/KafkaSource.scala | 14 +-- .../spark/sql/kafka010/KafkaWriteTask.scala | 17 ++- .../apache/spark/sql/kafka010/KafkaWriter.scala | 3 +- .../sql/kafka010/CachedKafkaProducerSuite.scala | 78 + 5 files changed, 206 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6730a70/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala new file mode 100644 index 000..571140b --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit} + +import com.google.common.cache._ +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} +import org.apache.kafka.clients.producer.KafkaProducer +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging + +private[kafka010] object CachedKafkaProducer extends Logging { + + private type Producer = KafkaProducer[Array[Byte], Array[Byte]] + + private lazy val cacheExpireTimeout: Long = +SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m") + + private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { +override def load(config: Seq[(String, Object)]): Producer = { + val configMap = config.map(x => x._1 -> x._2).toMap.asJava + createKafkaProducer(configMap) +} + } + + private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() { +override def onRemoval( +notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = { + val paramsSeq: Seq
spark git commit: [SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating one every batch.
Repository: spark Updated Branches: refs/heads/master 1c7db00c7 -> 96a4d1d08 [SPARK-19968][SS] Use a cached instance of `KafkaProducer` instead of creating one every batch. ## What changes were proposed in this pull request? In summary, cost of recreating a KafkaProducer for writing every batch is high as it starts a lot threads and make connections and then closes them. A KafkaProducer instance is promised to be thread safe in Kafka docs. Reuse of KafkaProducer instance while writing via multiple threads is encouraged. Furthermore, I have performance improvement of 10x in latency, with this patch. ### These are times that addBatch took in ms. Without applying this patch ![with-out_patch](https://cloud.githubusercontent.com/assets/992952/23994612/a9de4a42-0a6b-11e7-9d5b-7ae18775bee4.png) ### These are times that addBatch took in ms. After applying this patch ![with_patch](https://cloud.githubusercontent.com/assets/992952/23994616/ad8c11ec-0a6b-11e7-8634-2266ebb5033f.png) ## How was this patch tested? Running distributed benchmarks comparing runs with this patch and without it. Added relevant unit tests. Author: Prashant Sharma Closes #17308 from ScrapCodes/cached-kafka-producer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96a4d1d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96a4d1d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96a4d1d0 Branch: refs/heads/master Commit: 96a4d1d0827fc3fba83f174510b061684f0d00f7 Parents: 1c7db00 Author: Prashant Sharma Authored: Mon May 29 18:12:01 2017 -0700 Committer: Shixiong Zhu Committed: Mon May 29 18:12:01 2017 -0700 -- .../sql/kafka010/CachedKafkaProducer.scala | 112 +++ .../apache/spark/sql/kafka010/KafkaSource.scala | 14 +-- .../spark/sql/kafka010/KafkaWriteTask.scala | 17 ++- .../apache/spark/sql/kafka010/KafkaWriter.scala | 3 +- .../sql/kafka010/CachedKafkaProducerSuite.scala | 78 + 5 files changed, 206 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/96a4d1d0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala new file mode 100644 index 000..571140b --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit} + +import com.google.common.cache._ +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} +import org.apache.kafka.clients.producer.KafkaProducer +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging + +private[kafka010] object CachedKafkaProducer extends Logging { + + private type Producer = KafkaProducer[Array[Byte], Array[Byte]] + + private lazy val cacheExpireTimeout: Long = +SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m") + + private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { +override def load(config: Seq[(String, Object)]): Producer = { + val configMap = config.map(x => x._1 -> x._2).toMap.asJava + createKafkaProducer(configMap) +} + } + + private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() { +override def onRemoval( +notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = { + val paramsSeq: Seq[(String, Object)] = notification.getKey + val producer: Producer = notification.getValue + logDe
spark git commit: [SPARK-8184][SQL] Add additional function description for weekofyear
Repository: spark Updated Branches: refs/heads/branch-2.2 26640a269 -> 3b79e4cda [SPARK-8184][SQL] Add additional function description for weekofyear ## What changes were proposed in this pull request? Add additional function description for weekofyear. ## How was this patch tested? manual tests ![weekofyear](https://cloud.githubusercontent.com/assets/5399861/26525752/08a1c278-4394-11e7-8988-7cbf82c3a999.gif) Author: Yuming Wang Closes #18132 from wangyum/SPARK-8184. (cherry picked from commit 1c7db00c74ec6a91c7eefbdba85cbf41fbe8634a) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b79e4cd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b79e4cd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b79e4cd Branch: refs/heads/branch-2.2 Commit: 3b79e4cda74e0bf82ec55e673beb8f84e7cfaca4 Parents: 26640a2 Author: Yuming Wang Authored: Mon May 29 16:10:22 2017 -0700 Committer: Reynold Xin Committed: Mon May 29 16:10:29 2017 -0700 -- .../spark/sql/catalyst/expressions/datetimeExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3b79e4cd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 6a76058..0ab7207 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -402,13 +402,15 @@ case class DayOfMonth(child: Expression) extends UnaryExpression with ImplicitCa } } +// scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(date) - Returns the week of the year of the given date.", + usage = "_FUNC_(date) - Returns the week of the year of the given date. A week is considered to start on a Monday and week 1 is the first week with >3 days.", extended = """ Examples: > SELECT _FUNC_('2008-02-20'); 8 """) +// scalastyle:on line.size.limit case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(DateType) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8184][SQL] Add additional function description for weekofyear
Repository: spark Updated Branches: refs/heads/master c9749068e -> 1c7db00c7 [SPARK-8184][SQL] Add additional function description for weekofyear ## What changes were proposed in this pull request? Add additional function description for weekofyear. ## How was this patch tested? manual tests ![weekofyear](https://cloud.githubusercontent.com/assets/5399861/26525752/08a1c278-4394-11e7-8988-7cbf82c3a999.gif) Author: Yuming Wang Closes #18132 from wangyum/SPARK-8184. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c7db00c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c7db00c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c7db00c Branch: refs/heads/master Commit: 1c7db00c74ec6a91c7eefbdba85cbf41fbe8634a Parents: c974906 Author: Yuming Wang Authored: Mon May 29 16:10:22 2017 -0700 Committer: Reynold Xin Committed: Mon May 29 16:10:22 2017 -0700 -- .../spark/sql/catalyst/expressions/datetimeExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c7db00c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 43ca2cf..4098300 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -402,13 +402,15 @@ case class DayOfMonth(child: Expression) extends UnaryExpression with ImplicitCa } } +// scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(date) - Returns the week of the year of the given date.", + usage = "_FUNC_(date) - Returns the week of the year of the given date. A week is considered to start on a Monday and week 1 is the first week with >3 days.", extended = """ Examples: > SELECT _FUNC_('2008-02-20'); 8 """) +// scalastyle:on line.size.limit case class WeekOfYear(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(DateType) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20907][TEST] Use testQuietly for test suites that generate long log output
Repository: spark Updated Branches: refs/heads/branch-2.2 dc51be1e7 -> 26640a269 [SPARK-20907][TEST] Use testQuietly for test suites that generate long log output ## What changes were proposed in this pull request? Supress console output by using `testQuietly` in test suites ## How was this patch tested? Tested by `"SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit"` in `DataFrameSuite` Author: Kazuaki Ishizaki Closes #18135 from kiszk/SPARK-20907. (cherry picked from commit c9749068ecf8e0acabdfeeceeedff0f1f73293b7) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26640a26 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26640a26 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26640a26 Branch: refs/heads/branch-2.2 Commit: 26640a26984bac4fc1037714e60bd3607929b377 Parents: dc51be1 Author: Kazuaki Ishizaki Authored: Mon May 29 12:17:14 2017 -0700 Committer: Shixiong Zhu Committed: Mon May 29 12:17:22 2017 -0700 -- sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26640a26/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index be49b97..27558a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1733,7 +1733,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .count } - test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { + testQuietly("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { val N = 400 val rows = Seq(Row.fromSeq(Seq.fill(N)("string"))) val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20907][TEST] Use testQuietly for test suites that generate long log output
Repository: spark Updated Branches: refs/heads/master ef9fd920c -> c9749068e [SPARK-20907][TEST] Use testQuietly for test suites that generate long log output ## What changes were proposed in this pull request? Supress console output by using `testQuietly` in test suites ## How was this patch tested? Tested by `"SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit"` in `DataFrameSuite` Author: Kazuaki Ishizaki Closes #18135 from kiszk/SPARK-20907. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9749068 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9749068 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9749068 Branch: refs/heads/master Commit: c9749068ecf8e0acabdfeeceeedff0f1f73293b7 Parents: ef9fd92 Author: Kazuaki Ishizaki Authored: Mon May 29 12:17:14 2017 -0700 Committer: Shixiong Zhu Committed: Mon May 29 12:17:14 2017 -0700 -- sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c9749068/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 9f691cb..9ea9951 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1845,7 +1845,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .count } - test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { + testQuietly("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { val N = 400 val rows = Seq(Row.fromSeq(Seq.fill(N)("string"))) val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20750][SQL] Built-in SQL Function Support - REPLACE
Repository: spark Updated Branches: refs/heads/master f9b59abea -> ef9fd920c [SPARK-20750][SQL] Built-in SQL Function Support - REPLACE ## What changes were proposed in this pull request? This PR adds built-in SQL function `(REPLACE(, [, ])` `REPLACE()` return that string that is replaced all occurrences with given string. ## How was this patch tested? added new test suites Author: Kazuaki Ishizaki Closes #18047 from kiszk/SPARK-20750. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef9fd920 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef9fd920 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef9fd920 Branch: refs/heads/master Commit: ef9fd920c3241e05915c231bef50e3e51a655ce6 Parents: f9b59ab Author: Kazuaki Ishizaki Authored: Mon May 29 11:47:31 2017 -0700 Committer: Xiao Li Committed: Mon May 29 11:47:31 2017 -0700 -- .../apache/spark/unsafe/types/UTF8String.java | 9 + .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/stringExpressions.scala | 42 .../expressions/StringExpressionsSuite.scala| 20 ++ .../sql-tests/inputs/string-functions.sql | 4 ++ .../sql-tests/results/string-functions.sql.out | 18 - 6 files changed, 93 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef9fd920/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 5437e99..40b9fc9 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -835,6 +835,15 @@ public final class UTF8String implements Comparable, Externalizable, return res; } + public UTF8String replace(UTF8String search, UTF8String replace) { +if (EMPTY_UTF8.equals(search)) { + return this; +} +String replaced = toString().replace( + search.toString(), replace.toString()); +return fromString(replaced); + } + // TODO: Need to use `Code Point` here instead of Char in case the character longer than 2 bytes public UTF8String translate(Map dict) { String srcStr = this.toString(); http://git-wip-us.apache.org/repos/asf/spark/blob/ef9fd920/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index a4c7f7a..549fa0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -304,6 +304,7 @@ object FunctionRegistry { expression[RegExpExtract]("regexp_extract"), expression[RegExpReplace]("regexp_replace"), expression[StringRepeat]("repeat"), +expression[StringReplace]("replace"), expression[StringReverse]("reverse"), expression[RLike]("rlike"), expression[StringRPad]("rpad"), http://git-wip-us.apache.org/repos/asf/spark/blob/ef9fd920/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index aba2f5f..1dbe098 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -340,6 +340,48 @@ case class EndsWith(left: Expression, right: Expression) extends StringPredicate } } +/** + * Replace all occurrences with string. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(str, search[, replace]) - Replaces all occurrences of `search` with `replace`.", + extended = """ +Arguments: + str - a string expression + search - a string expression. If `search` is not found in `str`, `str` is returned unchanged. + replace - a string expression. If `replace` is not specified or is an empty string, nothing replaces +the string that is removed from `str`. + +Examples: + > SELECT _FUNC_('ABCabc', 'abc', 'DE
spark git commit: [SPARK-20758][SQL] Add Constant propagation optimization
Repository: spark Updated Branches: refs/heads/master 9d0db5a7f -> f9b59abea [SPARK-20758][SQL] Add Constant propagation optimization ## What changes were proposed in this pull request? See class doc of `ConstantPropagation` for the approach used. ## How was this patch tested? - Added unit tests Author: Tejas Patil Closes #17993 from tejasapatil/SPARK-20758_const_propagation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9b59abe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9b59abe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9b59abe Branch: refs/heads/master Commit: f9b59abeae16088c7c4d3a475762ef6c4ad42b4b Parents: 9d0db5a Author: Tejas Patil Authored: Mon May 29 12:21:34 2017 +0200 Committer: Herman van Hovell Committed: Mon May 29 12:21:34 2017 +0200 -- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../sql/catalyst/optimizer/expressions.scala| 56 +++ .../optimizer/ConstantPropagationSuite.scala| 167 +++ .../datasources/FileSourceStrategySuite.scala | 18 +- 4 files changed, 235 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f9b59abe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index ae2f6bf..d16689a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -92,6 +92,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) CombineUnions, // Constant folding and strength reduction NullPropagation(conf), + ConstantPropagation, FoldablePropagation, OptimizeIn(conf), ConstantFolding, http://git-wip-us.apache.org/repos/asf/spark/blob/f9b59abe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 8931eb2..51f749a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -54,6 +54,62 @@ object ConstantFolding extends Rule[LogicalPlan] { } } +/** + * Substitutes [[Attribute Attributes]] which can be statically evaluated with their corresponding + * value in conjunctive [[Expression Expressions]] + * eg. + * {{{ + * SELECT * FROM table WHERE i = 5 AND j = i + 3 + * ==> SELECT * FROM table WHERE i = 5 AND j = 8 + * }}} + * + * Approach used: + * - Start from AND operator as the root + * - Get all the children conjunctive predicates which are EqualTo / EqualNullSafe such that they + * don't have a `NOT` or `OR` operator in them + * - Populate a mapping of attribute => constant value by looking at all the equals predicates + * - Using this mapping, replace occurrence of the attributes with the corresponding constant values + * in the AND node. + */ +object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper { + private def containsNonConjunctionPredicates(expression: Expression): Boolean = expression.find { +case _: Not | _: Or => true +case _ => false + }.isDefined + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case f: Filter => f transformExpressionsUp { + case and: And => +val conjunctivePredicates = + splitConjunctivePredicates(and) +.filter(expr => expr.isInstanceOf[EqualTo] || expr.isInstanceOf[EqualNullSafe]) +.filterNot(expr => containsNonConjunctionPredicates(expr)) + +val equalityPredicates = conjunctivePredicates.collect { + case e @ EqualTo(left: AttributeReference, right: Literal) => ((left, right), e) + case e @ EqualTo(left: Literal, right: AttributeReference) => ((right, left), e) + case e @ EqualNullSafe(left: AttributeReference, right: Literal) => ((left, right), e) + case e @ EqualNullSafe(left: Literal, right: AttributeReference) => ((right, left), e) +} + +val constantsMap = AttributeMap(equalityPredicates.map(_._1)) +val predicates = equalityPredicates.map(_._2).toSet + +def replaceConstants(expression: Expression) = expression transform { + case a