[GitHub] spark issue #21942: [SPARK-24283][ML] Make ml.StandardScaler skip conversion...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21942 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21942: [SPARK-24283][ML] Make ml.StandardScaler skip conversion...
Github user sujithjay commented on the issue: https://github.com/apache/spark/pull/21942 @holdenk Could you also please take a look at this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21942: [SPARK-24283][ML] Make ml.StandardScaler skip conversion...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21942 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21942: [SPARK-24283][ML] Make ml.StandardScaler skip conversion...
Github user holdensmagicalunicorn commented on the issue: https://github.com/apache/spark/pull/21942 @sujithjay, thanks! I am a bot who has found some folks who might be able to help with the review:@mengxr, @jkbradley and @MLnick --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21942: [SPARK-24283][ML] Make ml.StandardScaler skip con...
GitHub user sujithjay opened a pull request: https://github.com/apache/spark/pull/21942 [SPARK-24283][ML] Make ml.StandardScaler skip conversion of Spar⦠â¦k ml vectors to mllib vectors ## What changes were proposed in this pull request? Currently, ml.StandardScaler converts Spark ml vectors to mllib vectors during transform operation. This PR makes changes to skip this step. ## How was this patch tested? Existing tests in StandardScalerSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/sujithjay/spark SPARK-24283 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21942.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21942 commit c010d1bb904717412591384f31bd085e3d98f502 Author: sujithjay Date: 2018-08-01T08:37:10Z [SPARK-24283][ML][WIP] Make ml.StandardScaler skip conversion of Spark ml vectors to mllib vectors --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21941 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21941 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93871/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21941 **[Test build #93871 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93871/testReport)** for PR 21941 at commit [`c0821b6`](https://github.com/apache/spark/commit/c0821b6dd8e713edf2bd1ddd9a27f170d8f8). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon, please help with the review and merge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21895 @jerryshao that can be done but I see mainly 2 problems IIUC: - your suggestion about blacklisting has the same "caching" and "memory leakage" problems of the solution proposed here, ie. if permissions on the file are changed, we wouldn't be aware until STS is restarted and we need to store in memory the set of the files in blacklist (they may be much less than the total number of files, this is true, so probably this is not a big problem); - with your suggestion, we will also try to delete the log file IIUC, so we have to lower to debug also the log related to access denied when deleting the files (I don't think this is a big issue, but this is something which has to be taken into account, as user may miss other issues which are currently evident with the normal logging level, eg. if the file is readonly for the spark user). What do you think? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21699: [SPARK-24722][SQL] pivot() with Column type argum...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21699#discussion_r206802439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -339,29 +400,30 @@ class RelationalGroupedDataset protected[sql]( /** * Pivots a column of the current `DataFrame` and performs the specified aggregation. - * There are two versions of pivot function: one that requires the caller to specify the list - * of distinct values to pivot on, and one that does not. The latter is more concise but less - * efficient, because Spark needs to first compute the list of distinct values internally. + * This is an overloaded version of the `pivot` method with `pivotColumn` of the `String` type. * * {{{ * // Compute the sum of earnings for each year by course with each course as a separate column - * df.groupBy("year").pivot("course", Seq("dotNET", "Java")).sum("earnings") - * - * // Or without specifying column values (less efficient) - * df.groupBy("year").pivot("course").sum("earnings") + * df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings") * }}} * - * @param pivotColumn Name of the column to pivot. + * @param pivotColumn the column to pivot. * @param values List of values that will be translated to columns in the output DataFrame. - * @since 1.6.0 + * @since 2.4.0 */ - def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = { + def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = { +import org.apache.spark.sql.functions.struct groupType match { case RelationalGroupedDataset.GroupByType => +val pivotValues = values.map { --- End diff -- @HyukjinKwon Should I revert the last commit and propose it as a separate PR? I think it makes sense to discuss in JIRA ticket possible alternatives for API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21909: [SPARK-24959][SQL] Speed up count() for JSON and CSV
Github user MaxGekk commented on the issue: https://github.com/apache/spark/pull/21909 @HyukjinKwon @maropu Any objections to the PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21546 Yea.. I hope so. only one comment left. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206790470 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.concurrent.ConcurrentHashMap + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +class MemoryStateStore extends StateStore() { + import scala.collection.JavaConverters._ + private val map = new ConcurrentHashMap[UnsafeRow, UnsafeRow] + + override def iterator(): Iterator[UnsafeRowPair] = { +map.entrySet.iterator.asScala.map { case e => new UnsafeRowPair(e.getKey, e.getValue) } + } + + override def get(key: UnsafeRow): UnsafeRow = map.get(key) + + override def put(key: UnsafeRow, newValue: UnsafeRow): Unit = { +map.put(key.copy(), newValue.copy()) + } + + override def remove(key: UnsafeRow): Unit = { +map.remove(key) --- End diff -- Yeah missed that. Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21469 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93869/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21469 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206791736 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] --- End diff -- It would be going to be `getStateValueSchema` btw, once we change return type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206784385 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -201,33 +200,37 @@ object WatermarkSupport { case class StateStoreRestoreExec( keyExpressions: Seq[Attribute], stateInfo: Option[StatefulOperatorStateInfo], +stateFormatVersion: Int, child: SparkPlan) extends UnaryExecNode with StateStoreReader { + private[sql] val stateManager = StreamingAggregationStateManager.createStateManager( +keyExpressions, child.output, stateFormatVersion) + override protected def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithStateStore( getStateInfo, keyExpressions.toStructType, - child.output.toStructType, + stateManager.getValueExpressions.toStructType, --- End diff -- Right. Sounds like `StructType` is preferred than `Seq[Attribute]` in this case. Will apply. Maybe dumb question from newbie on Spark SQL (still trying to get familiar with) : I guess we prefer StructType in this case cause it's less restrictive and also get rid of headache of dealing with fields reference. Do I understand correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206791325 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StatefulOperatorsHelperSuite extends StreamTest { + import TestMaterial._ + + test("StateManager v1 - get, put, iter") { +val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 1) + +// in V1, input row is stored as value +testGetPutIterOnStateManager(stateManager, OUTPUT_ATTRIBUTES, TEST_ROW, TEST_KEY_ROW, TEST_ROW) + } + + // StateManagerImplV2 + test("StateManager v2 - get, put, iter") { +val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 2) + +// in V2, row for values itself (excluding keys from input row) is stored as value +// so that stored value doesn't have key part, but state manager V2 will provide same output +// as V1 when getting row for key +testGetPutIterOnStateManager(stateManager, VALUES_ATTRIBUTES, TEST_ROW, TEST_KEY_ROW, + TEST_VALUE_ROW) + } + + private def newStateManager( + keysAttributes: Seq[Attribute], + outputAttributes: Seq[Attribute], --- End diff -- Yes, and actually, for StateManager, `input row attributes` and `output attributes` are same according to how StateStore*Exec work, so I picked either one. I'm happy to rename if `inputRowAttributes` is clearer to give insight which schema should be passed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206786014 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala --- @@ -53,7 +53,35 @@ class StreamingAggregationSuite extends StateStoreMetricsTest import testImplicits._ - test("simple count, update mode") { + def executeFuncWithStateVersionSQLConf( + stateVersion: Int, + confPairs: Seq[(String, String)], + func: => Any): Unit = { +withSQLConf(confPairs ++ + Seq(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION.key -> stateVersion.toString): _*) { + func +} + } + + def testWithAllStateVersions(name: String, confPairs: (String, String)*) --- End diff -- Actually it's basically from wondering of how `withSQLConf` works. Does `withSQLConf` handle nested `withSQLConf` properly? If then we don't need to add `confPairs` param at all, and if not I guess we might still want to add this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206780521 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] +def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow +def get(store: StateStore, key: UnsafeRow): UnsafeRow --- End diff -- I might think naively about this: I thought its interface is similar to StateStore so wondered we need to add docs, but I think I was wrong. Will add docs. Thanks for the insightful input! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206779898 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] +def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow --- End diff -- > In fact, if there exists a StateManager to manage all the state in the store, then ALL operations to add/remove state should go through the manager and store should not be accessed directly. Totally agreed that it should be better design of StateManager. I don't remember I tried to do before, so let me try applying your suggestion and see there's anything blocks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21469 **[Test build #93869 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93869/testReport)** for PR 21469 at commit [`ed072fc`](https://github.com/apache/spark/commit/ed072fcf057f982275d0daf69787ed812f03e87b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206781209 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] +def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow +def get(store: StateStore, key: UnsafeRow): UnsafeRow +def put(store: StateStore, row: UnsafeRow): Unit + } + + object StreamingAggregationStateManager extends Logging { +def createStateManager( +keyExpressions: Seq[Attribute], +childOutput: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, childOutput) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, childOutput) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val childOutput: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, childOutput) + +def extractKey(row: InternalRow): UnsafeRow = keyProjector(row) + } + + class StreamingAggregationStateManagerImplV1( + keyExpressions: Seq[Attribute], + childOutput: Seq[Attribute]) +extends StreamingAggregationStateManagerBaseImpl(keyExpressions, childOutput) { + +override def getValueExpressions: Seq[Attribute] = { + childOutput +} + +override def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow = { + rowPair.value +} + +override def get(store: StateStore, key: UnsafeRow): UnsafeRow = { + store.get(key) +} + +override def put(store: StateStore, row: UnsafeRow): Unit = { + store.put(extractKey(row), row) +} + } + + class StreamingAggregationStateManagerImplV2( --- End diff -- Great point. I might be in a rush to show its shape. Will add doc for state formats in both V1 and V2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206790358 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.concurrent.ConcurrentHashMap + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +class MemoryStateStore extends StateStore() { --- End diff -- It was actually just extracted from other place to reuse among the places, but I agree it's better to document once it is kind of public API for testing. Will add. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206778127 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { --- End diff -- Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206780754 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] +def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow +def get(store: StateStore, key: UnsafeRow): UnsafeRow +def put(store: StateStore, row: UnsafeRow): Unit + } + + object StreamingAggregationStateManager extends Logging { +def createStateManager( +keyExpressions: Seq[Attribute], +childOutput: Seq[Attribute], --- End diff -- Sounds much better and you're right about concept of `child`. Will rename. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206778355 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow --- End diff -- Renaming sounds better. Will rename, and will also add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206790505 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StatefulOperatorsHelperSuite extends StreamTest { --- End diff -- Will rename. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206788634 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulOperatorsHelperSuite.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.streaming.StatefulOperatorsHelper.StreamingAggregationStateManager +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StatefulOperatorsHelperSuite extends StreamTest { + import TestMaterial._ + + test("StateManager v1 - get, put, iter") { +val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 1) + +// in V1, input row is stored as value +testGetPutIterOnStateManager(stateManager, OUTPUT_ATTRIBUTES, TEST_ROW, TEST_KEY_ROW, TEST_ROW) + } + + // StateManagerImplV2 + test("StateManager v2 - get, put, iter") { +val stateManager = newStateManager(KEYS_ATTRIBUTES, OUTPUT_ATTRIBUTES, 2) + +// in V2, row for values itself (excluding keys from input row) is stored as value +// so that stored value doesn't have key part, but state manager V2 will provide same output +// as V1 when getting row for key +testGetPutIterOnStateManager(stateManager, VALUES_ATTRIBUTES, TEST_ROW, TEST_KEY_ROW, + TEST_VALUE_ROW) + } + + private def newStateManager( + keysAttributes: Seq[Attribute], + outputAttributes: Seq[Attribute], + version: Int): StreamingAggregationStateManager = { +StreamingAggregationStateManager.createStateManager(keysAttributes, outputAttributes, version) + } + + private def testGetPutIterOnStateManager( + stateManager: StreamingAggregationStateManager, + expectedValueExpressions: Seq[Attribute], + inputRow: UnsafeRow, + expectedStateKey: UnsafeRow, + expectedStateValue: UnsafeRow): Unit = { + +assert(stateManager.getValueExpressions === expectedValueExpressions) + +val memoryStateStore = new MemoryStateStore() +stateManager.put(memoryStateStore, inputRow) + +assert(memoryStateStore.iterator().size === 1) + +val keyRow = stateManager.extractKey(inputRow) +assert(keyRow === expectedStateKey) + +// iterate state store and verify whether expected format of key and value are stored +val pair = memoryStateStore.iterator().next() +assert(pair.key === keyRow) +assert(pair.value === expectedStateValue) +assert(stateManager.restoreOriginRow(pair) === inputRow) + +// verify the stored value once again via get +assert(memoryStateStore.get(keyRow) === expectedStateValue) + +// state manager should return row which is same as input row regardless of format version +assert(inputRow === stateManager.get(memoryStateStore, keyRow)) + } + +} + +object TestMaterial { + val KEYS: Seq[String] = Seq("key1", "key2") --- End diff -- I intended to use them like `static final` fields: so treated them as constants and follow the style guide for constants - `Constants should be all uppercase letters and be put in a companion object.` That's why I extracted them into separate object (though it is not a companion object due to naming better) as well as naming as uppercases. But that is not intentional and definitely bad if it requires us to jump back and forth. I'm going to place them as earliest part of class for now, but please let me know you are seeing better place to put. ---
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206778971 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { + + val supportedVersions = Seq(1, 2) + val legacyVersion = 1 + + sealed trait StreamingAggregationStateManager extends Serializable { +def extractKey(row: InternalRow): UnsafeRow +def getValueExpressions: Seq[Attribute] --- End diff -- It is to define the schema of value from / to state. For V1 it would be same to input schema and for V2 it would be `input schema - key schema`. Would `getStateValueExpressions` be OK for us? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206778077 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulOperatorsHelper.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateUnsafeRowJoiner} +import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair} +import org.apache.spark.sql.types.StructType + +object StatefulOperatorsHelper { --- End diff -- Yeah right. I found your PR useful to get an idea of how to model the classes because it was dealing with similar requirement, but didn't indicate the reason why you place it into StatefulOperatorsHelper. I'll move them to the state package. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206775357 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -871,6 +871,16 @@ object SQLConf { .intConf .createWithDefault(2) + val STREAMING_AGGREGATION_STATE_FORMAT_VERSION = + buildConf("spark.sql.streaming.streamingAggregation.stateFormatVersion") --- End diff -- Ah OK. Sounds better. Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r206776398 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -871,6 +871,16 @@ object SQLConf { .intConf .createWithDefault(2) + val STREAMING_AGGREGATION_STATE_FORMAT_VERSION = + buildConf("spark.sql.streaming.streamingAggregation.stateFormatVersion") + .internal() + .doc("State format version used by streaming aggregation operations triggered " + +"explicitly or implicitly via agg() in a streaming query. State between versions are " + --- End diff -- I was to explain that the option only applies to the operators which go through StateStoreRestoreExec / StateStoreSaveExec (so max("field1") as well as agg("field1" -> "max")), but now I feel it just gives confusion and I don't think end users need to understand details behind of config. Will remove the part `explicitly or implicitly via agg()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21883: [SPARK-24937][SQL] Datasource partition table should loa...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21883 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21883: [SPARK-24937][SQL] Datasource partition table should loa...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21883 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93868/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21883: [SPARK-24937][SQL] Datasource partition table should loa...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21883 **[Test build #93868 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93868/testReport)** for PR 21883 at commit [`536346e`](https://github.com/apache/spark/commit/536346e60ed24ee447f991aacf58cafe9415a020). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20958: [SPARK-23844][SS] Fix socket source honors recovered off...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20958 Build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20958: [SPARK-23844][SS] Fix socket source honors recovered off...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20958 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1555/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20272: [SPARK-23078] [CORE] [K8s] allow Spark Thrift Server to ...
Github user deveshk0 commented on the issue: https://github.com/apache/spark/pull/20272 I have built spark with the same changes for thrift server. It is running fine for me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21933: [SPARK-24917] make chunk size configurable
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21933 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21639: [SPARK-24653][tests] Avoid cross-job pollution in...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21639 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21933: [SPARK-24917] make chunk size configurable
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21933 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93867/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21933: [SPARK-24917] make chunk size configurable
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21933 **[Test build #93867 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93867/testReport)** for PR 21933 at commit [`0251bd5`](https://github.com/apache/spark/commit/0251bd517e7fd3e695cb8366ffa03de8c9e2900b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [SPARK-23030][SQL][PYTHON] Use Arrow stream format for c...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21546 so... can we still target this for 2.4? sounds like very good to have and complimentary to hydrogen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21516: [SPARK-24501][MESOS] Add Dispatcher and Driver metrics
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21516 **[Test build #93875 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93875/testReport)** for PR 21516 at commit [`50c1c1e`](https://github.com/apache/spark/commit/50c1c1e810fa27480ae7e72640cc8f67b44a60f1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21639: [SPARK-24653][tests] Avoid cross-job pollution in TestUt...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21639 Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21516: [SPARK-24501][MESOS] Add Dispatcher and Driver metrics
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21516 Jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21923: [SPARK-24918][Core] Executor Plugin api
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21923 this https://github.com/squito/spark-memory --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21895 My current thinking is to revert SPARK-20172 and improve the logging when exception is met during the actual read. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21895#discussion_r206780805 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -973,6 +978,42 @@ private[history] object FsHistoryProvider { private[history] val CURRENT_LISTING_VERSION = 1L } +private[history] trait CachedFileSystemHelper extends Logging { + protected def fs: FileSystem + protected def expireTimeInSeconds: Long + + /** + * LRU cache containing the result for the already checked files. + */ + // Visible for testing. + private[history] val cache = CacheBuilder.newBuilder() +.expireAfterAccess(expireTimeInSeconds, TimeUnit.SECONDS) +.build[String, java.lang.Boolean]() --- End diff -- Memory doesn't increase indefinitely as entries expire over the time. Moreover, as here we are storing a string containing only the name of the file and a Boolean, each entry is going to need about 100bytes in memory. With 100k event logs,this means about 10MB, which doesn't seem to me a value which can cause an OOM. Anyway, we can also add a maximum number of entries for this cache if you think it is necessary. This would cause some more RPC calls though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21909: [SPARK-24959][SQL] Speed up count() for JSON and CSV
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21909 got it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21639: [SPARK-24653][tests] Avoid cross-job pollution in TestUt...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21639 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21639: [SPARK-24653][tests] Avoid cross-job pollution in TestUt...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21639 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93861/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21639: [SPARK-24653][tests] Avoid cross-job pollution in TestUt...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21639 **[Test build #93861 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93861/testReport)** for PR 21639 at commit [`18d5ebf`](https://github.com/apache/spark/commit/18d5ebfd201deaebf774835ec5eb08d2b6d08454). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21895#discussion_r206779479 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -973,6 +978,42 @@ private[history] object FsHistoryProvider { private[history] val CURRENT_LISTING_VERSION = 1L } +private[history] trait CachedFileSystemHelper extends Logging { --- End diff -- This is true, but the only way to avoid this issue is to call fs.access every time,which may cause huge performance issues. Moreover,I think it is also very unlikely that a user manually changes permission of the event logs of an application and restarting the SHS in such a scenario would solve the problem. In the current state, even though the file is accessible, it is ignored and the user has no workaround other than changing ownership or permissions to all files,despite the user running SHS can read the files (moreover it is a regression for these users)... Anyway if you have a better suggestion I am more than happy to follow it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21938: [SPARK-24982][SQL] UDAF resolution should not thr...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21938 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21938: [SPARK-24982][SQL] UDAF resolution should not throw Asse...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21938 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21699: [SPARK-24722][SQL] pivot() with Column type argument
Github user maryannxue commented on the issue: https://github.com/apache/spark/pull/21699 Thank you for the change, @MaxGekk! @HyukjinKwon my idea was actually that the overloaded versions of pivot would be `pivot(column: Column, values, Seq[Column])`, so that we can construct different types in "values". The constant check will be done in Analyzer, so we don't need to worry about it here. Ultimately we would like to support complex-typed values in `pivot(column: Column)` as well, but I think we can make this in a different PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21938: [SPARK-24982][SQL] UDAF resolution should not throw Asse...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21938 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93862/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21622 **[Test build #93874 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93874/testReport)** for PR 21622 at commit [`722e6a0`](https://github.com/apache/spark/commit/722e6a0f7506440f260126d841d0cb27cf744100). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21938: [SPARK-24982][SQL] UDAF resolution should not throw Asse...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21938 **[Test build #93862 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93862/testReport)** for PR 21938 at commit [`84262dc`](https://github.com/apache/spark/commit/84262dc21dd9f9aa409dd5e873d31d5b26a231f3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21622 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21661: [SPARK-24685][build] Restore support for building old Ha...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21661 **[Test build #93873 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93873/testReport)** for PR 21661 at commit [`1db4ab8`](https://github.com/apache/spark/commit/1db4ab8d1781036278329ae313cb7b1bf2c201c7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21661: [SPARK-24685][build] Restore support for building old Ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21661 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21661: [SPARK-24685][build] Restore support for building old Ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21661 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1554/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21661: [SPARK-24685][build] Restore support for building old Ha...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21661 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21930: [SPARK-14540][Core] Fix remaining major issues for Scala...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21930 I think that's binary-incompatible breaking API change, right? ex. https://github.com/apache/spark/pull/21930/files#diff-2b8f0f66fe5397b169d0f754e99da8d5R64 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21936: [SPARK-24981][Core] ShutdownHook timeout causes j...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21936#discussion_r206769869 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -571,7 +571,12 @@ class SparkContext(config: SparkConf) extends Logging { _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") - stop() + try { +stop() + } catch { +case e: Throwable => + logWarning("Ignoring Exception while stoping SparkContext. Exception: " + e) --- End diff -- `stoping` -> `stopping` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21936: [SPARK-24981][Core] ShutdownHook timeout causes j...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21936#discussion_r206770131 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -571,7 +571,12 @@ class SparkContext(config: SparkConf) extends Logging { _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") - stop() + try { +stop() + } catch { +case e: Throwable => + logWarning("Ignoring Exception while stoping SparkContext. Exception: " + e) --- End diff -- use this format `logWarning("", exception)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21661: [SPARK-24685][build] Restore support for building old Ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21661 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21661: [SPARK-24685][build] Restore support for building old Ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21661 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93860/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21661: [SPARK-24685][build] Restore support for building old Ha...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21661 **[Test build #93860 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93860/testReport)** for PR 21661 at commit [`1db4ab8`](https://github.com/apache/spark/commit/1db4ab8d1781036278329ae313cb7b1bf2c201c7). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21941 **[Test build #93872 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93872/testReport)** for PR 21941 at commit [`47cbc5a`](https://github.com/apache/spark/commit/47cbc5a8d77c949674ff97c5763936a8425b0f00). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21941 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21941 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1553/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206768063 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1451,6 +1451,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val SETOPS_PRECEDENCE_ENFORCED = +buildConf("spark.sql.setops.precedence.enforced") --- End diff -- @gatorsmile Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21622#discussion_r206766835 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", +progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L) + + registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L) + registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L) + --- End diff -- Thanks for the input! I'll keep the patch as it is. Could you suggest approach to extend the maintained metrics? I would like to expand more, and newer things might be coming from custom metrics (like from source and sink) so might be worth to have extension point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21756: [SPARK-24764] [CORE] Add ServiceLoader implementation fo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21756 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93856/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21892: [SPARK-24945][SQL] Switching to uniVocity 2.7.2
Github user jbax commented on the issue: https://github.com/apache/spark/pull/21892 Thanks @MaxGekk I've fixed the error and also made the parser run faster than before when processing fields that were not selected in general. Can you please retest with the latest SNAPSHOT build and let me know how it goes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21756: [SPARK-24764] [CORE] Add ServiceLoader implementation fo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21756 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21756: [SPARK-24764] [CORE] Add ServiceLoader implementation fo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21756 **[Test build #93856 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93856/testReport)** for PR 21756 at commit [`6b9edca`](https://github.com/apache/spark/commit/6b9edca76579cd1adfb42eb4085b604b050b552c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206764090 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -535,14 +535,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Intersect(left, right, true) => throw new IllegalStateException( "logical intersect operator should have been replaced by union, aggregate" + -"and generate operators in the optimizer") +" and generate operators in the optimizer") case logical.Except(left, right, false) => throw new IllegalStateException( "logical except operator should have been replaced by anti-join in the optimizer") case logical.Except(left, right, true) => throw new IllegalStateException( "logical except (all) operator should have been replaced by union, aggregate" + -"and generate operators in the optimizer") +" and generate operators in the optimizer") --- End diff -- This is not related to the current PR. This addresses a comment from @HyukjinKwon in [21886](https://github.com/apache/spark/pull/21886) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206764069 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -535,14 +535,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Intersect(left, right, true) => throw new IllegalStateException( "logical intersect operator should have been replaced by union, aggregate" + -"and generate operators in the optimizer") +" and generate operators in the optimizer") --- End diff -- This is not related to the current PR. This addresses a comment from @HyukjinKwon in [21886](https://github.com/apache/spark/pull/21886) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206764004 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -165,9 +165,9 @@ object SetOperation { } case class Intersect( - left: LogicalPlan, - right: LogicalPlan, - isAll: Boolean = false) extends SetOperation(left, right) { +left: LogicalPlan, --- End diff -- This is not related to the current PR. This addresses a comment from @HyukjinKwon in [21886](https://github.com/apache/spark/pull/21886) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206763936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1451,6 +1451,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val SETOPS_PRECEDENCE_ENFORCED = +buildConf("spark.sql.setops.precedence.enforced") --- End diff -- let me think about the name of conf --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206763732 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1451,6 +1451,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val SETOPS_PRECEDENCE_ENFORCED = +buildConf("spark.sql.setops.precedence.enforced") + .doc("When set to true and order of evaluation is not specified by parentheses, " + +"INTERSECT operations are performed before any UNION or EXCEPT operations. " + --- End diff -- also include MINUS --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206763501 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -676,4 +677,42 @@ class PlanParserSuite extends AnalysisTest { OneRowRelation().select('rtrim.function("c&^,.", "bc...,,,&&")) ) } + + test("precedence of set operations") { +val a = table("a").select(star()) +val b = table("b").select(star()) +val c = table("c").select(star()) +val d = table("d").select(star()) + +val query1 = + """ +|SELECT * FROM a +|UNION +|SELECT * FROM b +|EXCEPT +|SELECT * FROM c +|INTERSECT +|SELECT * FROM d + """.stripMargin + +val query2 = + """ +|SELECT * FROM a +|UNION +|SELECT * FROM b +|EXCEPT ALL +|SELECT * FROM c +|INTERSECT ALL +|SELECT * FROM d + """.stripMargin + +assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d))) --- End diff -- also add `withSQLConf(SQLConf.SETOPS_PRECEDENCE_ENFORCED.key -> "true") {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206763358 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -17,6 +17,12 @@ grammar SqlBase; @members { + /** + * When true, INTERSECT is given precedence over UNION and EXCEPT set operations as per --- End diff -- > When true, INTERSECT is given precedence over UNION and EXCEPT set operations as per -> > When true, INTERSECT is given the greater precedence over the other set operations (UNION, EXCEPT and MINUS) as per --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19084: [SPARK-20711][ML]MultivariateOnlineSummarizer/Sum...
Github user zhengruifeng closed the pull request at: https://github.com/apache/spark/pull/19084 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org