This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new d3fd9ffe0c0 [SPARK-42691][CONNECT][PYTHON] Implement Dataset.semanticHash d3fd9ffe0c0 is described below commit d3fd9ffe0c0c9e257b27aa38cd809279824dabbc Author: Jiaan Geng <belie...@163.com> AuthorDate: Sat Mar 11 17:03:50 2023 +0800 [SPARK-42691][CONNECT][PYTHON] Implement Dataset.semanticHash ### What changes were proposed in this pull request? Implement `Dataset.semanticHash` for scala and python API of Spark connect. ### Why are the changes needed? Implement `Dataset.semanticHash` for scala and python API of Spark connect. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test cases. Closes #40366 from beliefer/SPARK-42691. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> (cherry picked from commit 79b5abed8bdcd5f9657b4bcff2c5ea0c767d0bf6) Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 11 +- .../scala/org/apache/spark/sql/SparkSession.scala | 4 + .../sql/connect/client/SparkConnectClient.scala | 9 + .../org/apache/spark/sql/ClientE2ETestSuite.scala | 15 ++ .../src/main/protobuf/spark/connect/base.proto | 11 + .../service/SparkConnectAnalyzeHandler.scala | 9 + python/pyspark/sql/connect/client.py | 16 ++ python/pyspark/sql/connect/dataframe.py | 11 +- python/pyspark/sql/connect/proto/base_pb2.py | 222 ++++++++++++--------- python/pyspark/sql/connect/proto/base_pb2.pyi | 49 +++++ .../sql/tests/connect/test_connect_basic.py | 9 +- 11 files changed, 263 insertions(+), 103 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index ff37614e87d..fdc994b2d90 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2773,8 +2773,17 @@ class Dataset[T] private[sql] ( sparkSession.sameSemantics(this.plan, other.plan) } + /** + * Returns a `hashCode` of the logical query plan against this [[Dataset]]. + * + * @note + * Unlike the standard `hashCode`, the hash is calculated against the query plan simplified by + * tolerating the cosmetic differences such as attribute names. + * @since 3.4.0 + */ + @DeveloperApi def semanticHash(): Int = { - throw new UnsupportedOperationException("semanticHash is not implemented.") + sparkSession.semanticHash(this.plan) } def toJSON: Dataset[String] = { diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 85f576ec515..141bb637e15 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -404,6 +404,10 @@ class SparkSession private[sql] ( client.sameSemantics(plan, otherPlan).getSameSemantics.getResult } + private[sql] def semanticHash(plan: proto.Plan): Int = { + client.semanticHash(plan).getSemanticHash.getResult + } + private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]): SparkResult[T] = { val value = client.execute(plan) val result = new SparkResult(value, allocator, encoder) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index 736a8af8e38..60d6d202ff5 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -157,6 +157,15 @@ private[sql] class SparkConnectClient( analyze(builder) } + def semanticHash(plan: proto.Plan): proto.AnalyzePlanResponse = { + val builder = proto.AnalyzePlanRequest.newBuilder() + builder.setSemanticHash( + proto.AnalyzePlanRequest.SemanticHash + .newBuilder() + .setPlan(plan)) + analyze(builder) + } + private def analyze(builder: proto.AnalyzePlanRequest.Builder): proto.AnalyzePlanResponse = { val request = builder .setUserContext(userContext) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 466a51841d4..60bb23516b0 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -635,6 +635,21 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { assert(plan.sameSemantics(otherPlan)) } + test("sameSemantics and semanticHash") { + val df1 = spark.createDataFrame(Seq((1, 2), (4, 5))) + val df2 = spark.createDataFrame(Seq((1, 2), (4, 5))) + val df3 = spark.createDataFrame(Seq((0, 2), (4, 5))) + val df4 = spark.createDataFrame(Seq((0, 2), (4, 5))) + + assert(df1.sameSemantics(df2) === true) + assert(df1.sameSemantics(df3) === false) + assert(df3.sameSemantics(df4) === true) + + assert(df1.semanticHash === df2.semanticHash) + assert(df1.semanticHash !== df3.semanticHash) + assert(df3.semanticHash === df4.semanticHash) + } + test("toJSON") { val expected = Array( """{"b":0.0,"id":0,"d":"world","a":0}""", diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index 1a9c437f0ec..2118f8e4823 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -81,6 +81,7 @@ message AnalyzePlanRequest { SparkVersion spark_version = 10; DDLParse ddl_parse = 11; SameSemantics same_semantics = 12; + SemanticHash semantic_hash = 13; } message Schema { @@ -157,6 +158,11 @@ message AnalyzePlanRequest { // (Required) The other plan to be compared. Plan other_plan = 2; } + + message SemanticHash { + // (Required) The logical plan to get a hashCode. + Plan plan = 1; + } } // Response to performing analysis of the query. Contains relevant metadata to be able to @@ -174,6 +180,7 @@ message AnalyzePlanResponse { SparkVersion spark_version = 8; DDLParse ddl_parse = 9; SameSemantics same_semantics = 10; + SemanticHash semantic_hash = 11; } message Schema { @@ -212,6 +219,10 @@ message AnalyzePlanResponse { message SameSemantics { bool result = 1; } + + message SemanticHash { + int32 result = 1; + } } // A request to be executed by the service. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala index 9520ec8015f..4697a1fd7d4 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala @@ -152,6 +152,15 @@ private[connect] class SparkConnectAnalyzeHandler( .newBuilder() .setResult(target.sameSemantics(other))) + case proto.AnalyzePlanRequest.AnalyzeCase.SEMANTIC_HASH => + val semanticHash = Dataset + .ofRows(session, planner.transformRelation(request.getSemanticHash.getPlan.getRoot)) + .semanticHash() + builder.setSemanticHash( + proto.AnalyzePlanResponse.SemanticHash + .newBuilder() + .setResult(semanticHash)) + case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!") } diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index 3c91661716e..8dd80a931b9 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -401,6 +401,7 @@ class AnalyzeResult: spark_version: Optional[str], parsed: Optional[DataType], is_same_semantics: Optional[bool], + semantic_hash: Optional[int], ): self.schema = schema self.explain_string = explain_string @@ -411,6 +412,7 @@ class AnalyzeResult: self.spark_version = spark_version self.parsed = parsed self.is_same_semantics = is_same_semantics + self.semantic_hash = semantic_hash @classmethod def fromProto(cls, pb: Any) -> "AnalyzeResult": @@ -423,6 +425,7 @@ class AnalyzeResult: spark_version: Optional[str] = None parsed: Optional[DataType] = None is_same_semantics: Optional[bool] = None + semantic_hash: Optional[int] = None if pb.HasField("schema"): schema = types.proto_schema_to_pyspark_data_type(pb.schema.schema) @@ -442,6 +445,8 @@ class AnalyzeResult: parsed = types.proto_schema_to_pyspark_data_type(pb.ddl_parse.parsed) elif pb.HasField("same_semantics"): is_same_semantics = pb.same_semantics.result + elif pb.HasField("semantic_hash"): + semantic_hash = pb.semantic_hash.result else: raise SparkConnectException("No analyze result found!") @@ -455,6 +460,7 @@ class AnalyzeResult: spark_version, parsed, is_same_semantics, + semantic_hash, ) @@ -704,6 +710,14 @@ class SparkConnectClient(object): assert result is not None return result + def semantic_hash(self, plan: pb2.Plan) -> int: + """ + returns a `hashCode` of the logical query plan. + """ + result = self._analyze(method="semantic_hash", plan=plan).semantic_hash + assert result is not None + return result + def close(self) -> None: """ Close the channel. @@ -782,6 +796,8 @@ class SparkConnectClient(object): elif method == "same_semantics": req.same_semantics.target_plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan"))) req.same_semantics.other_plan.CopyFrom(cast(pb2.Plan, kwargs.get("other"))) + elif method == "semantic_hash": + req.semantic_hash.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan"))) else: raise ValueError(f"Unknown Analyze method: {method}") diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 0e114f9fedb..0887294ddcf 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1618,9 +1618,6 @@ class DataFrame: def _repr_html_(self, *args: Any, **kwargs: Any) -> None: raise NotImplementedError("_repr_html_() is not implemented.") - def semanticHash(self, *args: Any, **kwargs: Any) -> None: - raise NotImplementedError("semanticHash() is not implemented.") - def sameSemantics(self, other: "DataFrame") -> bool: assert self._plan is not None assert other._plan is not None @@ -1631,6 +1628,14 @@ class DataFrame: sameSemantics.__doc__ = PySparkDataFrame.sameSemantics.__doc__ + def semanticHash(self) -> int: + assert self._plan is not None + return self._session.client.semantic_hash( + plan=self._plan.to_proto(self._session.client), + ) + + semanticHash.__doc__ = PySparkDataFrame.semanticHash.__doc__ + def writeTo(self, table: str) -> "DataFrameWriterV2": assert self._plan is not None return DataFrameWriterV2(self._plan, self._session, table) diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py index c67e58b44cd..030a28cf360 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.py +++ b/python/pyspark/sql/connect/proto/base_pb2.py @@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...] + b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...] ) @@ -53,6 +53,7 @@ _ANALYZEPLANREQUEST_INPUTFILES = _ANALYZEPLANREQUEST.nested_types_by_name["Input _ANALYZEPLANREQUEST_SPARKVERSION = _ANALYZEPLANREQUEST.nested_types_by_name["SparkVersion"] _ANALYZEPLANREQUEST_DDLPARSE = _ANALYZEPLANREQUEST.nested_types_by_name["DDLParse"] _ANALYZEPLANREQUEST_SAMESEMANTICS = _ANALYZEPLANREQUEST.nested_types_by_name["SameSemantics"] +_ANALYZEPLANREQUEST_SEMANTICHASH = _ANALYZEPLANREQUEST.nested_types_by_name["SemanticHash"] _ANALYZEPLANRESPONSE = DESCRIPTOR.message_types_by_name["AnalyzePlanResponse"] _ANALYZEPLANRESPONSE_SCHEMA = _ANALYZEPLANRESPONSE.nested_types_by_name["Schema"] _ANALYZEPLANRESPONSE_EXPLAIN = _ANALYZEPLANRESPONSE.nested_types_by_name["Explain"] @@ -63,6 +64,7 @@ _ANALYZEPLANRESPONSE_INPUTFILES = _ANALYZEPLANRESPONSE.nested_types_by_name["Inp _ANALYZEPLANRESPONSE_SPARKVERSION = _ANALYZEPLANRESPONSE.nested_types_by_name["SparkVersion"] _ANALYZEPLANRESPONSE_DDLPARSE = _ANALYZEPLANRESPONSE.nested_types_by_name["DDLParse"] _ANALYZEPLANRESPONSE_SAMESEMANTICS = _ANALYZEPLANRESPONSE.nested_types_by_name["SameSemantics"] +_ANALYZEPLANRESPONSE_SEMANTICHASH = _ANALYZEPLANRESPONSE.nested_types_by_name["SemanticHash"] _EXECUTEPLANREQUEST = DESCRIPTOR.message_types_by_name["ExecutePlanRequest"] _EXECUTEPLANRESPONSE = DESCRIPTOR.message_types_by_name["ExecutePlanResponse"] _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT = _EXECUTEPLANRESPONSE.nested_types_by_name[ @@ -214,6 +216,15 @@ AnalyzePlanRequest = _reflection.GeneratedProtocolMessageType( # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.SameSemantics) }, ), + "SemanticHash": _reflection.GeneratedProtocolMessageType( + "SemanticHash", + (_message.Message,), + { + "DESCRIPTOR": _ANALYZEPLANREQUEST_SEMANTICHASH, + "__module__": "spark.connect.base_pb2" + # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.SemanticHash) + }, + ), "DESCRIPTOR": _ANALYZEPLANREQUEST, "__module__": "spark.connect.base_pb2" # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest) @@ -229,6 +240,7 @@ _sym_db.RegisterMessage(AnalyzePlanRequest.InputFiles) _sym_db.RegisterMessage(AnalyzePlanRequest.SparkVersion) _sym_db.RegisterMessage(AnalyzePlanRequest.DDLParse) _sym_db.RegisterMessage(AnalyzePlanRequest.SameSemantics) +_sym_db.RegisterMessage(AnalyzePlanRequest.SemanticHash) AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType( "AnalyzePlanResponse", @@ -315,6 +327,15 @@ AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType( # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.SameSemantics) }, ), + "SemanticHash": _reflection.GeneratedProtocolMessageType( + "SemanticHash", + (_message.Message,), + { + "DESCRIPTOR": _ANALYZEPLANRESPONSE_SEMANTICHASH, + "__module__": "spark.connect.base_pb2" + # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.SemanticHash) + }, + ), "DESCRIPTOR": _ANALYZEPLANRESPONSE, "__module__": "spark.connect.base_pb2" # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse) @@ -330,6 +351,7 @@ _sym_db.RegisterMessage(AnalyzePlanResponse.InputFiles) _sym_db.RegisterMessage(AnalyzePlanResponse.SparkVersion) _sym_db.RegisterMessage(AnalyzePlanResponse.DDLParse) _sym_db.RegisterMessage(AnalyzePlanResponse.SameSemantics) +_sym_db.RegisterMessage(AnalyzePlanResponse.SemanticHash) ExecutePlanRequest = _reflection.GeneratedProtocolMessageType( "ExecutePlanRequest", @@ -620,101 +642,105 @@ if _descriptor._USE_C_DESCRIPTORS == False: _USERCONTEXT._serialized_start = 309 _USERCONTEXT._serialized_end = 431 _ANALYZEPLANREQUEST._serialized_start = 434 - _ANALYZEPLANREQUEST._serialized_end = 2091 - _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1297 - _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1346 - _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1349 - _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1664 - _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1492 - _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1664 - _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1666 - _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1719 - _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1721 - _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1771 - _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1773 - _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1827 - _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1829 - _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1882 - _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1884 - _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1898 - _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1900 - _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1941 - _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1943 - _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2064 - _ANALYZEPLANRESPONSE._serialized_start = 2094 - _ANALYZEPLANRESPONSE._serialized_end = 3298 - _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2866 - _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2923 - _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2925 - _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2973 - _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2975 - _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3020 - _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3022 - _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3058 - _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3060 - _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3108 - _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3110 - _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3144 - _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3146 - _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3186 - _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3188 - _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3247 - _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3249 - _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3288 - _EXECUTEPLANREQUEST._serialized_start = 3301 - _EXECUTEPLANREQUEST._serialized_end = 3510 - _EXECUTEPLANRESPONSE._serialized_start = 3513 - _EXECUTEPLANRESPONSE._serialized_end = 4739 - _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3970 - _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4041 - _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4043 - _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4104 - _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4107 - _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4624 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4202 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4534 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4411 - _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4534 - _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4536 - _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4624 - _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4626 - _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4722 - _KEYVALUE._serialized_start = 4741 - _KEYVALUE._serialized_end = 4806 - _CONFIGREQUEST._serialized_start = 4809 - _CONFIGREQUEST._serialized_end = 5837 - _CONFIGREQUEST_OPERATION._serialized_start = 5029 - _CONFIGREQUEST_OPERATION._serialized_end = 5527 - _CONFIGREQUEST_SET._serialized_start = 5529 - _CONFIGREQUEST_SET._serialized_end = 5581 - _CONFIGREQUEST_GET._serialized_start = 5583 - _CONFIGREQUEST_GET._serialized_end = 5608 - _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5610 - _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5673 - _CONFIGREQUEST_GETOPTION._serialized_start = 5675 - _CONFIGREQUEST_GETOPTION._serialized_end = 5706 - _CONFIGREQUEST_GETALL._serialized_start = 5708 - _CONFIGREQUEST_GETALL._serialized_end = 5756 - _CONFIGREQUEST_UNSET._serialized_start = 5758 - _CONFIGREQUEST_UNSET._serialized_end = 5785 - _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5787 - _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5821 - _CONFIGRESPONSE._serialized_start = 5839 - _CONFIGRESPONSE._serialized_end = 5961 - _ADDARTIFACTSREQUEST._serialized_start = 5964 - _ADDARTIFACTSREQUEST._serialized_end = 6781 - _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6313 - _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6366 - _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6368 - _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6479 - _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6481 - _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6574 - _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6577 - _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6770 - _ADDARTIFACTSRESPONSE._serialized_start = 6784 - _ADDARTIFACTSRESPONSE._serialized_end = 6972 - _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6891 - _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6972 - _SPARKCONNECTSERVICE._serialized_start = 6975 - _SPARKCONNECTSERVICE._serialized_end = 7340 + _ANALYZEPLANREQUEST._serialized_end = 2235 + _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1384 + _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1433 + _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1436 + _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1751 + _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1579 + _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1751 + _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1753 + _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1806 + _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1808 + _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1858 + _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1860 + _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1914 + _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1916 + _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1969 + _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1971 + _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1985 + _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1987 + _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2028 + _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2030 + _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2151 + _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2153 + _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2208 + _ANALYZEPLANRESPONSE._serialized_start = 2238 + _ANALYZEPLANRESPONSE._serialized_end = 3570 + _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 3098 + _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 3155 + _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 3157 + _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 3205 + _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 3207 + _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3252 + _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3254 + _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3290 + _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3292 + _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3340 + _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3342 + _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3376 + _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3378 + _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3418 + _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3420 + _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3479 + _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3481 + _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3520 + _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 3522 + _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 3560 + _EXECUTEPLANREQUEST._serialized_start = 3573 + _EXECUTEPLANREQUEST._serialized_end = 3782 + _EXECUTEPLANRESPONSE._serialized_start = 3785 + _EXECUTEPLANRESPONSE._serialized_end = 5011 + _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 4242 + _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4313 + _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4315 + _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4376 + _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4379 + _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4896 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4474 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4806 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4683 + _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4806 + _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4808 + _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4896 + _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4898 + _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4994 + _KEYVALUE._serialized_start = 5013 + _KEYVALUE._serialized_end = 5078 + _CONFIGREQUEST._serialized_start = 5081 + _CONFIGREQUEST._serialized_end = 6109 + _CONFIGREQUEST_OPERATION._serialized_start = 5301 + _CONFIGREQUEST_OPERATION._serialized_end = 5799 + _CONFIGREQUEST_SET._serialized_start = 5801 + _CONFIGREQUEST_SET._serialized_end = 5853 + _CONFIGREQUEST_GET._serialized_start = 5855 + _CONFIGREQUEST_GET._serialized_end = 5880 + _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5882 + _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5945 + _CONFIGREQUEST_GETOPTION._serialized_start = 5947 + _CONFIGREQUEST_GETOPTION._serialized_end = 5978 + _CONFIGREQUEST_GETALL._serialized_start = 5980 + _CONFIGREQUEST_GETALL._serialized_end = 6028 + _CONFIGREQUEST_UNSET._serialized_start = 6030 + _CONFIGREQUEST_UNSET._serialized_end = 6057 + _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 6059 + _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 6093 + _CONFIGRESPONSE._serialized_start = 6111 + _CONFIGRESPONSE._serialized_end = 6233 + _ADDARTIFACTSREQUEST._serialized_start = 6236 + _ADDARTIFACTSREQUEST._serialized_end = 7053 + _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6585 + _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6638 + _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6640 + _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6751 + _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6753 + _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6846 + _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6849 + _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 7042 + _ADDARTIFACTSRESPONSE._serialized_start = 7056 + _ADDARTIFACTSRESPONSE._serialized_end = 7244 + _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 7163 + _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 7244 + _SPARKCONNECTSERVICE._serialized_start = 7247 + _SPARKCONNECTSERVICE._serialized_end = 7612 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi index e87194f31aa..8ea50f6a580 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.pyi +++ b/python/pyspark/sql/connect/proto/base_pb2.pyi @@ -350,6 +350,23 @@ class AnalyzePlanRequest(google.protobuf.message.Message): ], ) -> None: ... + class SemanticHash(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + PLAN_FIELD_NUMBER: builtins.int + @property + def plan(self) -> global___Plan: + """(Required) The logical plan to get a hashCode.""" + def __init__( + self, + *, + plan: global___Plan | None = ..., + ) -> None: ... + def HasField( + self, field_name: typing_extensions.Literal["plan", b"plan"] + ) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["plan", b"plan"]) -> None: ... + SESSION_ID_FIELD_NUMBER: builtins.int USER_CONTEXT_FIELD_NUMBER: builtins.int CLIENT_TYPE_FIELD_NUMBER: builtins.int @@ -362,6 +379,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message): SPARK_VERSION_FIELD_NUMBER: builtins.int DDL_PARSE_FIELD_NUMBER: builtins.int SAME_SEMANTICS_FIELD_NUMBER: builtins.int + SEMANTIC_HASH_FIELD_NUMBER: builtins.int session_id: builtins.str """(Required) @@ -395,6 +413,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message): def ddl_parse(self) -> global___AnalyzePlanRequest.DDLParse: ... @property def same_semantics(self) -> global___AnalyzePlanRequest.SameSemantics: ... + @property + def semantic_hash(self) -> global___AnalyzePlanRequest.SemanticHash: ... def __init__( self, *, @@ -410,6 +430,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message): spark_version: global___AnalyzePlanRequest.SparkVersion | None = ..., ddl_parse: global___AnalyzePlanRequest.DDLParse | None = ..., same_semantics: global___AnalyzePlanRequest.SameSemantics | None = ..., + semantic_hash: global___AnalyzePlanRequest.SemanticHash | None = ..., ) -> None: ... def HasField( self, @@ -434,6 +455,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message): b"same_semantics", "schema", b"schema", + "semantic_hash", + b"semantic_hash", "spark_version", b"spark_version", "tree_string", @@ -465,6 +488,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message): b"same_semantics", "schema", b"schema", + "semantic_hash", + b"semantic_hash", "session_id", b"session_id", "spark_version", @@ -492,6 +517,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message): "spark_version", "ddl_parse", "same_semantics", + "semantic_hash", ] | None: ... global___AnalyzePlanRequest = AnalyzePlanRequest @@ -639,6 +665,20 @@ class AnalyzePlanResponse(google.protobuf.message.Message): self, field_name: typing_extensions.Literal["result", b"result"] ) -> None: ... + class SemanticHash(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + RESULT_FIELD_NUMBER: builtins.int + result: builtins.int + def __init__( + self, + *, + result: builtins.int = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["result", b"result"] + ) -> None: ... + SESSION_ID_FIELD_NUMBER: builtins.int SCHEMA_FIELD_NUMBER: builtins.int EXPLAIN_FIELD_NUMBER: builtins.int @@ -649,6 +689,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message): SPARK_VERSION_FIELD_NUMBER: builtins.int DDL_PARSE_FIELD_NUMBER: builtins.int SAME_SEMANTICS_FIELD_NUMBER: builtins.int + SEMANTIC_HASH_FIELD_NUMBER: builtins.int session_id: builtins.str @property def schema(self) -> global___AnalyzePlanResponse.Schema: ... @@ -668,6 +709,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message): def ddl_parse(self) -> global___AnalyzePlanResponse.DDLParse: ... @property def same_semantics(self) -> global___AnalyzePlanResponse.SameSemantics: ... + @property + def semantic_hash(self) -> global___AnalyzePlanResponse.SemanticHash: ... def __init__( self, *, @@ -681,6 +724,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message): spark_version: global___AnalyzePlanResponse.SparkVersion | None = ..., ddl_parse: global___AnalyzePlanResponse.DDLParse | None = ..., same_semantics: global___AnalyzePlanResponse.SameSemantics | None = ..., + semantic_hash: global___AnalyzePlanResponse.SemanticHash | None = ..., ) -> None: ... def HasField( self, @@ -701,6 +745,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message): b"same_semantics", "schema", b"schema", + "semantic_hash", + b"semantic_hash", "spark_version", b"spark_version", "tree_string", @@ -726,6 +772,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message): b"same_semantics", "schema", b"schema", + "semantic_hash", + b"semantic_hash", "session_id", b"session_id", "spark_version", @@ -746,6 +794,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message): "spark_version", "ddl_parse", "same_semantics", + "semantic_hash", ] | None: ... global___AnalyzePlanResponse = AnalyzePlanResponse diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index dad303d3463..fc5031bd91a 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -2814,6 +2814,14 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): other = self.connect.sql("SELECT 1") self.assertTrue(plan.sameSemantics(other)) + def test_semantic_hash(self): + plan = self.connect.sql("SELECT 1") + other = self.connect.sql("SELECT 1") + self.assertEqual( + plan.semanticHash(), + other.semanticHash(), + ) + def test_unsupported_functions(self): # SPARK-41225: Disable unsupported functions. df = self.connect.read.table(self.tbl_name) @@ -2829,7 +2837,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): "checkpoint", "localCheckpoint", "_repr_html_", - "semanticHash", ): with self.assertRaises(NotImplementedError): getattr(df, f)() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org