[spark-docker] branch master updated: [SPARK-41258][INFRA] Upgrade docker and actions to cleanup warnning
This is an automated email from the ASF dual-hosted git repository. yikun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-docker.git The following commit(s) were added to refs/heads/master by this push: new 33abc18 [SPARK-41258][INFRA] Upgrade docker and actions to cleanup warnning 33abc18 is described below commit 33abc1894f3de135e827ce393842ca355229c117 Author: Yikun Jiang AuthorDate: Fri Nov 25 14:57:27 2022 +0800 [SPARK-41258][INFRA] Upgrade docker and actions to cleanup warnning ### What changes were proposed in this pull request? - Upgrade `actions/checkout` from v2 to v3 - Upgrade `docker/build-push-action` from v2 to v3 ### Why are the changes needed? Cleanup set output and lower version node warnning ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test passed Closes #24 from Yikun/upgrade-actions. Authored-by: Yikun Jiang Signed-off-by: Yikun Jiang --- .github/workflows/main.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index dfb99e9..024b853 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -71,7 +71,7 @@ jobs: image_suffix: [python3-ubuntu, ubuntu, r-ubuntu, python3-r-ubuntu] steps: - name: Checkout Spark Docker repository -uses: actions/checkout@v2 +uses: actions/checkout@v3 - name: Set up QEMU uses: docker/setup-qemu-action@v2 @@ -122,7 +122,7 @@ jobs: echo "PUBLISH_IMAGE_URL:"${PUBLISH_IMAGE_URL} - name: Build and push test image -uses: docker/build-push-action@v2 +uses: docker/build-push-action@v3 with: context: ${{ env.IMAGE_PATH }} tags: ${{ env.IMAGE_URL }} @@ -258,7 +258,7 @@ jobs: - name: Publish - Push Image if: ${{ inputs.publish }} -uses: docker/build-push-action@v2 +uses: docker/build-push-action@v3 with: context: ${{ env.IMAGE_PATH }} push: true - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41181][SQL] Migrate the map options errors onto error classes
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0ae82d99d13 [SPARK-41181][SQL] Migrate the map options errors onto error classes 0ae82d99d13 is described below commit 0ae82d99d13988086a297920d45a766115a70578 Author: panbingkun AuthorDate: Fri Nov 25 09:03:49 2022 +0300 [SPARK-41181][SQL] Migrate the map options errors onto error classes ### What changes were proposed in this pull request? The pr aims to migrate the map options errors onto error classes. ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #38730 from panbingkun/SPARK-41181. Authored-by: panbingkun Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 27 + .../spark/sql/errors/QueryCompilationErrors.scala | 6 +- .../sql-tests/results/csv-functions.sql.out| 13 +++-- .../sql-tests/results/json-functions.sql.out | 12 ++-- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 66 -- 5 files changed, 81 insertions(+), 43 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 55a56712554..1246e870e0d 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -735,6 +735,23 @@ "The JOIN with LATERAL correlation is not allowed because an OUTER subquery cannot correlate to its join partner. Remove the LATERAL correlation or use an INNER JOIN, or LEFT OUTER JOIN instead." ] }, + "INVALID_OPTIONS" : { +"message" : [ + "Invalid options:" +], +"subClass" : { + "NON_MAP_FUNCTION" : { +"message" : [ + "Must use the `map()` function for options." +] + }, + "NON_STRING_TYPE" : { +"message" : [ + "A type of keys and values in `map()` must be string, but got ." +] + } +} + }, "INVALID_PANDAS_UDF_PLACEMENT" : { "message" : [ "The group aggregate pandas UDF cannot be invoked together with as other, non-pandas aggregate functions." @@ -2190,16 +2207,6 @@ "Schema should be struct type but got ." ] }, - "_LEGACY_ERROR_TEMP_1095" : { -"message" : [ - "A type of keys and values in map() must be string, but got ." -] - }, - "_LEGACY_ERROR_TEMP_1096" : { -"message" : [ - "Must use a map() function for options." -] - }, "_LEGACY_ERROR_TEMP_1097" : { "message" : [ "The field for corrupt records must be string type and nullable." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index fa22c36f841..486bd21b844 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1013,13 +1013,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def keyValueInMapNotStringError(m: CreateMap): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1095", - messageParameters = Map("map" -> m.dataType.catalogString)) + errorClass = "INVALID_OPTIONS.NON_STRING_TYPE", + messageParameters = Map("mapType" -> toSQLType(m.dataType))) } def nonMapFunctionNotAllowedError(): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1096", + errorClass = "INVALID_OPTIONS.NON_MAP_FUNCTION", messageParameters = Map.empty) } diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out index 0b5a63c28e4..200ddd837e1 100644 --- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -66,7 +66,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "_LEGACY_ERROR_TEMP_1096", + "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION", "queryContext" : [ { "objectType" : "", "objectName" : "", @@ -84,9 +84,9 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "_LEGACY_ERROR_TEMP_1095", + "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE", "messageParameters" : { -"map" : "map" +"mapType" : "\"MAP\"" }, "queryContext" : [ { "objectType" : "", @@ -222,7 +222,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { -
[spark] branch master updated: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b84ddd5e71a [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}` b84ddd5e71a is described below commit b84ddd5e71acc9ae3facdf47148becef3861d11d Author: Ruifeng Zheng AuthorDate: Fri Nov 25 11:57:33 2022 +0800 [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}` ### What changes were proposed in this pull request? ~~1, Make `AnalyzePlan` support specified multiple analysis tasks, that is, we can get `isLocal`, `schema`, `semanticHash` together in single RPC if we want.~~ 2, Implement following APIs - isLocal - isStreaming - printSchema - ~~semanticHash~~ - ~~sameSemantics~~ - inputFiles ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, new APIs ### How was this patch tested? added UTs Closes #38742 from zhengruifeng/connect_df_print_schema. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../src/main/protobuf/spark/connect/base.proto | 13 .../sql/connect/service/SparkConnectService.scala | 11 ++- .../connect/planner/SparkConnectServiceSuite.scala | 9 +++ python/pyspark/sql/connect/client.py | 23 ++- python/pyspark/sql/connect/dataframe.py| 78 +- python/pyspark/sql/connect/proto/base_pb2.py | 36 +- python/pyspark/sql/connect/proto/base_pb2.pyi | 36 +- .../sql/tests/connect/test_connect_basic.py| 40 +++ 8 files changed, 221 insertions(+), 25 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/base.proto b/connector/connect/src/main/protobuf/spark/connect/base.proto index d6dac4854ef..5f9a4411ecd 100644 --- a/connector/connect/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/src/main/protobuf/spark/connect/base.proto @@ -112,6 +112,19 @@ message AnalyzePlanResponse { // The extended explain string as produced by Spark. string explain_string = 3; + + // Get the tree string of the schema. + string tree_string = 4; + + // Whether the 'collect' and 'take' methods can be run locally. + bool is_local = 5; + + // Whether this plan contains one or more sources that continuously + // return data as it arrives. + bool is_streaming = 6; + + // A best-effort snapshot of the files that compose this Dataset + repeated string input_files = 7; } // A request to be executed by the service. diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 0c7a2ad2690..3046c8eebfc 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.connect.service import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ + import com.google.common.base.Ticker import com.google.common.cache.CacheBuilder import io.grpc.{Server, Status} @@ -127,10 +129,13 @@ class SparkConnectService(debug: Boolean) val ds = Dataset.ofRows(session, logicalPlan) val explainString = ds.queryExecution.explainString(explainMode) -val response = proto.AnalyzePlanResponse - .newBuilder() - .setExplainString(explainString) +val response = proto.AnalyzePlanResponse.newBuilder() response.setSchema(DataTypeProtoConverter.toConnectProtoType(ds.schema)) +response.setExplainString(explainString) +response.setTreeString(ds.schema.treeString) +response.setIsLocal(ds.isLocal) +response.setIsStreaming(ds.isStreaming) +response.addAllInputFiles(ds.inputFiles.toSeq.asJava) } } diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index 6ca3c2430c4..e5cd84fb504 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -65,6 +65,15 @@ class SparkConnectServiceSuite extends SharedSparkSession { assert( schema.getFields(1).getName == "col2" && schema.getFields(1).getDataType.getKindCase == proto.DataType.KindCase.STRING) + + assert(!response.getIsLocal) +
[spark] branch master updated (a205e97ad9a -> 575b8f00faf)
This is an automated email from the ASF dual-hosted git repository. yikun pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from a205e97ad9a [SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate expression type add 575b8f00faf [SPARK-41257][INFRA] Upgrade actions/labeler to v4 No new revisions were added by this update. Summary of changes: .github/workflows/labeler.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate expression type
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a205e97ad9a [SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate expression type a205e97ad9a is described below commit a205e97ad9ae7894b2ec27e5253da21c4500fc8c Author: Rui Wang AuthorDate: Fri Nov 25 09:32:41 2022 +0800 [SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate expression type ### What changes were proposed in this pull request? This PR proposes that Relations (e.g. Aggregate in this PR) should only deal with `Expression` than `str`. `str` could be mapped to different expressions (e.g. sql expression, unresolved_attribute, etc.). Relations are not supposed to understand the difference of `str` but DataFrame should understand it. This PR specifically changes for `Aggregate`. ### Why are the changes needed? Codebase refactoring. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38768 from amaliujia/SPARK-41230. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py| 61 -- python/pyspark/sql/connect/plan.py | 19 ++- .../sql/tests/connect/test_connect_basic.py| 7 +++ 3 files changed, 43 insertions(+), 44 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index bd374dcf814..e3a7e8c7335 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -37,6 +37,7 @@ from pyspark.sql.connect.column import ( Expression, LiteralExpression, SQLExpression, +ScalarFunctionExpression, ) from pyspark.sql.types import ( StructType, @@ -48,25 +49,13 @@ if TYPE_CHECKING: from pyspark.sql.connect.client import RemoteSparkSession -class GroupingFrame(object): - -MeasuresType = Union[Sequence[Tuple["ExpressionOrString", str]], Dict[str, str]] -OptMeasuresType = Optional[MeasuresType] - +class GroupedData(object): def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None: self._df = df self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in grouping_cols] -def agg(self, exprs: Optional[MeasuresType] = None) -> "DataFrame": - -# Normalize the dictionary into a list of tuples. -if isinstance(exprs, Dict): -measures = list(exprs.items()) -elif isinstance(exprs, List): -measures = exprs -else: -measures = [] - +def agg(self, measures: Sequence[Expression]) -> "DataFrame": +assert len(measures) > 0, "exprs should not be empty" res = DataFrame.withPlan( plan.Aggregate( child=self._df._plan, @@ -77,23 +66,27 @@ class GroupingFrame(object): ) return res -def _map_cols_to_dict(self, fun: str, cols: List[Union[Column, str]]) -> Dict[str, str]: -return {x if isinstance(x, str) else x.name(): fun for x in cols} +def _map_cols_to_expression( +self, fun: str, col: Union[Expression, str] +) -> Sequence[Expression]: +return [ +ScalarFunctionExpression(fun, Column(col)) if isinstance(col, str) else col, +] -def min(self, *cols: Union[Column, str]) -> "DataFrame": -expr = self._map_cols_to_dict("min", list(cols)) +def min(self, col: Union[Expression, str]) -> "DataFrame": +expr = self._map_cols_to_expression("min", col) return self.agg(expr) -def max(self, *cols: Union[Column, str]) -> "DataFrame": -expr = self._map_cols_to_dict("max", list(cols)) +def max(self, col: Union[Expression, str]) -> "DataFrame": +expr = self._map_cols_to_expression("max", col) return self.agg(expr) -def sum(self, *cols: Union[Column, str]) -> "DataFrame": -expr = self._map_cols_to_dict("sum", list(cols)) +def sum(self, col: Union[Expression, str]) -> "DataFrame": +expr = self._map_cols_to_expression("sum", col) return self.agg(expr) def count(self) -> "DataFrame": -return self.agg([(LiteralExpression(1), "count")]) +return self.agg([ScalarFunctionExpression("count", LiteralExpression(1))]) class DataFrame(object): @@ -162,8 +155,18 @@ class DataFrame(object): return DataFrame.withPlan(plan.Project(self._plan, *sql_expr), session=self._session) -def agg(self, exprs: Optional[GroupingFrame.MeasuresType]) -> "DataFrame": -return self.groupBy().agg(exprs) +def agg(self, *exprs: Union[Expression, Dict[str, str]]) -> "DataFrame": +if not exprs: +raise
[spark] branch master updated: [SPARK-41238][CONNECT][PYTHON] Support more built-in datatypes
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4006d195111 [SPARK-41238][CONNECT][PYTHON] Support more built-in datatypes 4006d195111 is described below commit 4006d195111334b4b795680e547dea9dd0acda22 Author: Ruifeng Zheng AuthorDate: Fri Nov 25 09:26:25 2022 +0800 [SPARK-41238][CONNECT][PYTHON] Support more built-in datatypes ### What changes were proposed in this pull request? 1, in the sever side, make `proto_datatype` <-> `catalyst_datatype` conversion support all the built-in sql datatypes; 2, in the client side, make `proto_datatype` <-> `pyspark_catalyst_datatype` conversion support [all the datatypes that are supported in pyspark now.](https://github.com/apache/spark/blob/master/python/pyspark/sql/types.py#L60-L83) ### Why are the changes needed? right now, only `long`, `string`, `struct` are supported ``` grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "Does not support convert float to connect proto types." debug_error_string = "{"created":"1669206685.760099000","description":"Error received from peer ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"Does not support convert float to connect proto types.","grpc_status":2}" ``` this PR make the schema and literal expr support more datatypes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added UT Closes #38770 from zhengruifeng/connect_support_more_datatypes. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../main/protobuf/spark/connect/expressions.proto | 2 +- .../src/main/protobuf/spark/connect/types.proto| 123 +++-- .../org/apache/spark/sql/connect/dsl/package.scala | 5 +- .../connect/planner/DataTypeProtoConverter.scala | 281 ++-- .../connect/planner/SparkConnectServiceSuite.scala | 4 +- python/pyspark/sql/connect/client.py | 108 - .../pyspark/sql/connect/proto/expressions_pb2.py | 66 +-- .../pyspark/sql/connect/proto/expressions_pb2.pyi | 16 +- python/pyspark/sql/connect/proto/types_pb2.py | 261 ++- python/pyspark/sql/connect/proto/types_pb2.pyi | 507 + .../sql/tests/connect/test_connect_basic.py| 67 +++ 11 files changed, 972 insertions(+), 468 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/expressions.proto b/connector/connect/src/main/protobuf/spark/connect/expressions.proto index ac5fe24d349..7ff06aeb196 100644 --- a/connector/connect/src/main/protobuf/spark/connect/expressions.proto +++ b/connector/connect/src/main/protobuf/spark/connect/expressions.proto @@ -68,7 +68,7 @@ message Expression { bytes uuid = 28; DataType null = 29; // a typed null literal List list = 30; - DataType.List empty_list = 31; + DataType.Array empty_array = 31; DataType.Map empty_map = 32; UserDefined user_defined = 33; } diff --git a/connector/connect/src/main/protobuf/spark/connect/types.proto b/connector/connect/src/main/protobuf/spark/connect/types.proto index ad043d85947..56dbf28665e 100644 --- a/connector/connect/src/main/protobuf/spark/connect/types.proto +++ b/connector/connect/src/main/protobuf/spark/connect/types.proto @@ -26,31 +26,46 @@ option java_package = "org.apache.spark.connect.proto"; // itself but only describes it. message DataType { oneof kind { -Boolean bool = 1; -I8 i8 = 2; -I16 i16 = 3; -I32 i32 = 5; -I64 i64 = 7; -FP32 fp32 = 10; -FP64 fp64 = 11; -String string = 12; -Binary binary = 13; -Timestamp timestamp = 14; -Date date = 16; -Time time = 17; -IntervalYear interval_year = 19; -IntervalDay interval_day = 20; -TimestampTZ timestamp_tz = 29; -UUID uuid = 32; - -FixedChar fixed_char = 21; -VarChar varchar = 22; -FixedBinary fixed_binary = 23; -Decimal decimal = 24; - -Struct struct = 25; -List list = 27; -Map map = 28; +NULL null = 1; + +Binary binary = 2; + +Boolean boolean = 3; + +// Numeric types +Byte byte = 4; +Short short = 5; +Integer integer = 6; +Long long = 7; + +Float float = 8; +Double double = 9; +Decimal decimal = 10; + +// String types +String string = 11; +Char char = 12; +VarChar var_char = 13; + +// Datatime types +Date date = 14; +Timestamp timestamp = 15; +TimestampNTZ timestamp_ntz = 16; + +// Interval types +CalendarInterval calendar_interval = 17; +YearMonthInterval year_month_interval = 18; +DayTimeInterval day_time_interval
[spark] branch master updated (033dbe604bc -> 71b5c5bde75)
This is an automated email from the ASF dual-hosted git repository. yikun pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 033dbe604bc [SPARK-41247][BUILD] Unify the Protobuf versions in Spark connect and Protobuf connector add 71b5c5bde75 [SPARK-41251][PS][INFRA] Upgrade pandas from 1.5.1 to 1.5.2 No new revisions were added by this update. Summary of changes: dev/infra/Dockerfile | 4 ++-- python/pyspark/pandas/supported_api_gen.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (483e3c93ddb -> 033dbe604bc)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 483e3c93ddb [SPARK-41097][CORE][SQL][SS][PROTOBUF] Remove redundant collection conversion base on Scala 2.13 code add 033dbe604bc [SPARK-41247][BUILD] Unify the Protobuf versions in Spark connect and Protobuf connector No new revisions were added by this update. Summary of changes: connector/connect/pom.xml | 1 - connector/protobuf/pom.xml | 1 - pom.xml| 7 +-- 3 files changed, 5 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (ac029d6ec0f -> 483e3c93ddb)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from ac029d6ec0f [SPARK-41224][SPARK-41165][SPARK-41184] Optimized Arrow-based collect implementation to stream from server to client add 483e3c93ddb [SPARK-41097][CORE][SQL][SS][PROTOBUF] Remove redundant collection conversion base on Scala 2.13 code No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala | 2 +- .../org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala | 2 +- .../org/apache/spark/sql/protobuf/utils/SchemaConverters.scala | 2 +- .../src/main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- core/src/main/scala/org/apache/spark/status/api/v1/api.scala| 2 +- .../src/main/scala/org/apache/spark/sql/types/StructType.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala | 2 +- .../apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 2 +- .../apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala | 6 +++--- .../main/scala/org/apache/spark/sql/execution/command/tables.scala | 2 +- .../spark/sql/execution/datasources/v2/ShowFunctionsExec.scala | 2 +- .../sql/execution/datasources/v2/ShowTablePropertiesExec.scala | 2 +- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 4 ++-- 13 files changed, 16 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41224][SPARK-41165][SPARK-41184] Optimized Arrow-based collect implementation to stream from server to client
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ac029d6ec0f [SPARK-41224][SPARK-41165][SPARK-41184] Optimized Arrow-based collect implementation to stream from server to client ac029d6ec0f is described below commit ac029d6ec0fc4d9d80c3ef098954ced6918fa80f Author: Hyukjin Kwon AuthorDate: Thu Nov 24 09:28:30 2022 -0400 [SPARK-41224][SPARK-41165][SPARK-41184] Optimized Arrow-based collect implementation to stream from server to client ### What changes were proposed in this pull request? This PR proposes an optimized Arrow-based collect, that is virtually https://github.com/apache/spark/pull/38720 that implements the logics except a couple of nits. ### Why are the changes needed? To stream the Arrow batch from the server to the client side instead of waiting all the jobs to finish. ### Does this PR introduce _any_ user-facing change? No, this feature isn't released yet. ### How was this patch tested? Unittest added. Closes #38720 Closes #38759 from HyukjinKwon/SPARK-41165-followup. Authored-by: Hyukjin Kwon Signed-off-by: Herman van Hovell --- .../service/SparkConnectStreamHandler.scala| 87 --- .../connect/planner/SparkConnectServiceSuite.scala | 99 +++--- 2 files changed, 163 insertions(+), 23 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index 092bdd00dc1..b5d100e894d 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.service import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import com.google.protobuf.ByteString import io.grpc.stub.StreamObserver @@ -32,6 +33,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ThreadUtils class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResponse]) extends Logging { @@ -71,20 +73,83 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp var numSent = 0 if (numPartitions > 0) { +type Batch = (Array[Byte], Long) + val batches = rows.mapPartitionsInternal( SparkConnectStreamHandler .rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId)) -batches.collect().foreach { case (bytes, count) => - val response = proto.ExecutePlanResponse.newBuilder().setClientId(clientId) - val batch = proto.ExecutePlanResponse.ArrowBatch -.newBuilder() -.setRowCount(count) -.setData(ByteString.copyFrom(bytes)) -.build() - response.setArrowBatch(batch) - responseObserver.onNext(response.build()) - numSent += 1 +val signal = new Object +val partitions = new Array[Array[Batch]](numPartitions) +var error: Option[Throwable] = None + +// This callback is executed by the DAGScheduler thread. +// After fetching a partition, it inserts the partition into the Map, and then +// wakes up the main thread. +val resultHandler = (partitionId: Int, partition: Array[Batch]) => { + signal.synchronized { +partitions(partitionId) = partition +signal.notify() + } + () +} + +val future = spark.sparkContext.submitJob( + rdd = batches, + processPartition = (iter: Iterator[Batch]) => iter.toArray, + partitions = Seq.range(0, numPartitions), + resultHandler = resultHandler, + resultFunc = () => ()) + +// Collect errors and propagate them to the main thread. +future.onComplete { result => + result.failed.foreach { throwable => +signal.synchronized { + error = Some(throwable) + signal.notify() +} + } +}(ThreadUtils.sameThread) + +// The main thread will wait until 0-th partition is available, +// then send it to client and wait for the next partition. +// Different from the implementation of [[Dataset#collectAsArrowToPython]], it sends +
[spark] branch master updated: [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3cd7f1166fc [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query 3cd7f1166fc is described below commit 3cd7f1166fc949abd79d3fb430e855b2925b038c Author: Jungtaek Lim AuthorDate: Thu Nov 24 20:27:18 2022 +0900 [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query ### What changes were proposed in this pull request? This PR proposes to add a new test suite specifically for self-union tests on streaming query. The test cases are acceptance tests for 4 different cases, DSv1 vs DSv2 / DataStreamReader API vs table API. ### Why are the changes needed? This PR brings more test coverage on streaming workloads. We should have caught an issue during the work of [SPARK-39564](https://issues.apache.org/jira/browse/SPARK-39564) if we had this test suite. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test suite. Closes #38785 from HeartSaVioR/SPARK-41249. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../sql/streaming/StreamingSelfUnionSuite.scala| 160 + 1 file changed, 160 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala new file mode 100644 index 000..8f099c31e6b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.PatienceConfiguration.Timeout + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, InMemoryStreamTableCatalog} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.sql.types.{LongType, StructField, StructType} + +class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter { + + import testImplicits._ + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + before { +spark.conf.set("spark.sql.catalog.teststream", classOf[InMemoryStreamTableCatalog].getName) + } + + after { +spark.sessionState.catalogManager.reset() +spark.sessionState.conf.clear() +sqlContext.streams.active.foreach(_.stop()) + } + + test("self-union, DSv1, read via DataStreamReader API") { +withTempPath { dir => + val dataLocation = dir.getAbsolutePath + spark.range(1, 4).write.format("parquet").save(dataLocation) + + val streamDf = spark.readStream.format("parquet") +.schema(StructType(Seq(StructField("id", LongType.load(dataLocation) + val unionedDf = streamDf.union(streamDf) + + testStream(unionedDf)( +ProcessAllAvailable(), +CheckLastBatch(1, 2, 3, 1, 2, 3), +AssertOnQuery { q => + val lastProgress = getLastProgressWithData(q) + assert(lastProgress.nonEmpty) + assert(lastProgress.get.numInputRows == 6) + assert(lastProgress.get.sources.length == 1) + assert(lastProgress.get.sources(0).numInputRows == 6) + true +} + ) +} + } + + test("self-union, DSv1, read via table API") { +withTable("parquet_streaming_tbl") { + spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING parquet") + + val streamDf = spark.readStream.table("parquet_streaming_tbl") + val unionedDf = streamDf.union(streamDf) + + val clock = new StreamManualClock() + testStream(unionedDf)( +StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)), +Execute { _ => + spark.range(1,
[spark] branch master updated (1f90e416314 -> 02e7fadc553)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 1f90e416314 [SPARK-41182][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1102 add 02e7fadc553 [SPARK-41250][CONNECT][PYTHON] DataFrame. toPandas should not return optional pandas dataframe No new revisions were added by this update. Summary of changes: .../src/main/protobuf/spark/connect/base.proto | 2 + python/pyspark/sql/connect/client.py | 51 +++--- python/pyspark/sql/connect/dataframe.py| 2 +- python/pyspark/sql/connect/proto/base_pb2_grpc.py | 5 ++- 4 files changed, 33 insertions(+), 27 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (074444bd71f -> 1f90e416314)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 07bd71f [SPARK-41179][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1092 add 1f90e416314 [SPARK-41182][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1102 No new revisions were added by this update. Summary of changes: core/src/main/resources/error/error-classes.json | 10 ++-- .../spark/sql/errors/QueryCompilationErrors.scala | 6 +-- .../resources/sql-tests/results/extract.sql.out| 56 +++--- 3 files changed, 35 insertions(+), 37 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org