This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 400d4ded64e [SPARK-43662][PS][CONNECT] Support merge_asof in Spark Connect 400d4ded64e is described below commit 400d4ded64efdf62b75f2bcdc6025d474d6395ee Author: Takuya UESHIN <ues...@databricks.com> AuthorDate: Wed Sep 27 15:09:47 2023 +0800 [SPARK-43662][PS][CONNECT] Support merge_asof in Spark Connect ### What changes were proposed in this pull request? Supports `merge_asof` in Spark Connect. ### Why are the changes needed? `merge_asof` is missing in Spark Connect. Ref: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.merge_asof.html ### Does this PR introduce _any_ user-facing change? Yes, `merge_asof` is available in Spark Connect. ### How was this patch tested? The parity tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43137 from ueshin/issues/SPARK-43662/merge_asof. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../main/protobuf/spark/connect/relations.proto | 42 ++++ .../sql/connect/planner/SparkConnectPlanner.scala | 37 +++ .../pandas/tests/connect/test_parity_reshape.py | 4 +- python/pyspark/sql/connect/dataframe.py | 41 +++ python/pyspark/sql/connect/plan.py | 94 +++++++ python/pyspark/sql/connect/proto/relations_pb2.py | 278 +++++++++++---------- python/pyspark/sql/connect/proto/relations_pb2.pyi | 123 +++++++++ python/pyspark/sql/dataframe.py | 3 + 8 files changed, 481 insertions(+), 141 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index 0cf08431d46..deb33978386 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -72,6 +72,7 @@ message Relation { CachedLocalRelation cached_local_relation = 36; CachedRemoteRelation cached_remote_relation = 37; CommonInlineUserDefinedTableFunction common_inline_user_defined_table_function = 38; + AsOfJoin as_of_join = 39; // NA functions NAFill fill_na = 90; @@ -1009,3 +1010,44 @@ message Parse { PARSE_FORMAT_JSON = 2; } } + +// Relation of type [[AsOfJoin]]. +// +// `left` and `right` must be present. +message AsOfJoin { + // (Required) Left input relation for a Join. + Relation left = 1; + + // (Required) Right input relation for a Join. + Relation right = 2; + + // (Required) Field to join on in left DataFrame + Expression left_as_of = 3; + + // (Required) Field to join on in right DataFrame + Expression right_as_of = 4; + + // (Optional) The join condition. Could be unset when `using_columns` is utilized. + // + // This field does not co-exist with using_columns. + Expression join_expr = 5; + + // Optional. using_columns provides a list of columns that should present on both sides of + // the join inputs that this Join will join on. For example A JOIN B USING col_name is + // equivalent to A JOIN B on A.col_name = B.col_name. + // + // This field does not co-exist with join_condition. + repeated string using_columns = 6; + + // (Required) The join type. + string join_type = 7; + + // (Optional) The asof tolerance within this range. + Expression tolerance = 8; + + // (Required) Whether allow matching with the same value or not. + bool allow_exact_matches = 9; + + // (Required) Whether to search for prior, subsequent, or closest matches. + string direction = 10; +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index dda7a713fa0..e6b5d89e1ab 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -110,6 +110,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { case proto.Relation.RelTypeCase.OFFSET => transformOffset(rel.getOffset) case proto.Relation.RelTypeCase.TAIL => transformTail(rel.getTail) case proto.Relation.RelTypeCase.JOIN => transformJoinOrJoinWith(rel.getJoin) + case proto.Relation.RelTypeCase.AS_OF_JOIN => transformAsOfJoin(rel.getAsOfJoin) case proto.Relation.RelTypeCase.DEDUPLICATE => transformDeduplicate(rel.getDeduplicate) case proto.Relation.RelTypeCase.SET_OP => transformSetOperation(rel.getSetOp) case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort) @@ -2275,6 +2276,42 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { } } + private def transformAsOfJoin(rel: proto.AsOfJoin): LogicalPlan = { + val left = Dataset.ofRows(session, transformRelation(rel.getLeft)) + val right = Dataset.ofRows(session, transformRelation(rel.getRight)) + val leftAsOf = Column(transformExpression(rel.getLeftAsOf)) + val rightAsOf = Column(transformExpression(rel.getRightAsOf)) + val joinType = rel.getJoinType + val tolerance = if (rel.hasTolerance) Column(transformExpression(rel.getTolerance)) else null + val allowExactMatches = rel.getAllowExactMatches + val direction = rel.getDirection + + val joined = if (rel.getUsingColumnsCount > 0) { + val usingColumns = rel.getUsingColumnsList.asScala.toSeq + left.joinAsOf( + other = right, + leftAsOf = leftAsOf, + rightAsOf = rightAsOf, + usingColumns = usingColumns, + joinType = joinType, + tolerance = tolerance, + allowExactMatches = allowExactMatches, + direction = direction) + } else { + val joinExprs = if (rel.hasJoinExpr) Column(transformExpression(rel.getJoinExpr)) else null + left.joinAsOf( + other = right, + leftAsOf = leftAsOf, + rightAsOf = rightAsOf, + joinExprs = joinExprs, + joinType = joinType, + tolerance = tolerance, + allowExactMatches = allowExactMatches, + direction = direction) + } + joined.logicalPlan + } + private def transformSort(sort: proto.Sort): LogicalPlan = { assert(sort.getOrderCount > 0, "'order' must be present and contain elements.") logical.Sort( diff --git a/python/pyspark/pandas/tests/connect/test_parity_reshape.py b/python/pyspark/pandas/tests/connect/test_parity_reshape.py index 0773978ba4b..356baaff5ba 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_reshape.py +++ b/python/pyspark/pandas/tests/connect/test_parity_reshape.py @@ -22,9 +22,7 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils class ReshapeParityTests(ReshapeTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @unittest.skip("TODO(SPARK-43662): Enable ReshapeParityTests.test_merge_asof.") - def test_merge_asof(self): - super().test_merge_asof() + pass if __name__ == "__main__": diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 9cc1ddead33..5e2623336a2 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -594,6 +594,47 @@ class DataFrame: join.__doc__ = PySparkDataFrame.join.__doc__ + def _joinAsOf( + self, + other: "DataFrame", + leftAsOfColumn: Union[str, Column], + rightAsOfColumn: Union[str, Column], + on: Optional[Union[str, List[str], Column, List[Column]]] = None, + how: Optional[str] = None, + *, + tolerance: Optional[Column] = None, + allowExactMatches: bool = True, + direction: str = "backward", + ) -> "DataFrame": + if self._plan is None: + raise Exception("Cannot join when self._plan is empty.") + if other._plan is None: + raise Exception("Cannot join when other._plan is empty.") + + if how is None: + how = "inner" + assert isinstance(how, str), "how should be a string" + + if tolerance is not None: + assert isinstance(tolerance, Column), "tolerance should be Column" + + return DataFrame.withPlan( + plan.AsOfJoin( + left=self._plan, + right=other._plan, + left_as_of=leftAsOfColumn, + right_as_of=rightAsOfColumn, + on=on, + how=how, + tolerance=tolerance, + allow_exact_matches=allowExactMatches, + direction=direction, + ), + session=self._session, + ) + + _joinAsOf.__doc__ = PySparkDataFrame._joinAsOf.__doc__ + def limit(self, n: int) -> "DataFrame": return DataFrame.withPlan(plan.Limit(child=self._plan, limit=n), session=self._session) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 6758b3673f3..10565b9965a 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -900,6 +900,100 @@ class Join(LogicalPlan): """ +class AsOfJoin(LogicalPlan): + def __init__( + self, + left: LogicalPlan, + right: LogicalPlan, + left_as_of: "ColumnOrName", + right_as_of: "ColumnOrName", + on: Optional[Union[str, List[str], Column, List[Column]]], + how: str, + tolerance: Optional[Column], + allow_exact_matches: bool, + direction: str, + ) -> None: + super().__init__(left) + self.left = left + self.right = right + self.left_as_of = left_as_of + self.right_as_of = right_as_of + self.on = on + self.how = how + self.tolerance = tolerance + self.allow_exact_matches = allow_exact_matches + self.direction = direction + + def plan(self, session: "SparkConnectClient") -> proto.Relation: + plan = self._create_proto_relation() + plan.as_of_join.left.CopyFrom(self.left.plan(session)) + plan.as_of_join.right.CopyFrom(self.right.plan(session)) + + if isinstance(self.left_as_of, Column): + plan.as_of_join.left_as_of.CopyFrom(self.left_as_of.to_plan(session)) + else: + plan.as_of_join.left_as_of.CopyFrom( + ColumnReference(self.left_as_of, self.left._plan_id).to_plan(session) + ) + + if isinstance(self.right_as_of, Column): + plan.as_of_join.right_as_of.CopyFrom(self.right_as_of.to_plan(session)) + else: + plan.as_of_join.right_as_of.CopyFrom( + ColumnReference(self.right_as_of, self.right._plan_id).to_plan(session) + ) + + if self.on is not None: + if not isinstance(self.on, list): + if isinstance(self.on, str): + plan.as_of_join.using_columns.append(self.on) + else: + plan.as_of_join.join_expr.CopyFrom(self.on.to_plan(session)) + elif len(self.on) > 0: + if isinstance(self.on[0], str): + plan.as_of_join.using_columns.extend(cast(List[str], self.on)) + else: + merge_column = functools.reduce(lambda c1, c2: c1 & c2, self.on) + plan.as_of_join.join_expr.CopyFrom(cast(Column, merge_column).to_plan(session)) + + plan.as_of_join.join_type = self.how + + if self.tolerance is not None: + plan.as_of_join.tolerance.CopyFrom(self.tolerance.to_plan(session)) + + plan.as_of_join.allow_exact_matches = self.allow_exact_matches + plan.as_of_join.direction = self.direction + + return plan + + def print(self, indent: int = 0) -> str: + assert self.left is not None + assert self.right is not None + + i = " " * indent + o = " " * (indent + LogicalPlan.INDENT) + n = indent + LogicalPlan.INDENT * 2 + return ( + f"{i}<AsOfJoin left_as_of={self.left_as_of}, right_as_of={self.right_as_of}, " + f"on={self.on} how={self.how}>\n{o}" + f"left=\n{self.left.print(n)}\n{o}right=\n{self.right.print(n)}" + ) + + def _repr_html_(self) -> str: + assert self.left is not None + assert self.right is not None + + return f""" + <ul> + <li> + <b>AsOfJoin</b><br /> + Left: {self.left._repr_html_()} + Right: {self.right._repr_html_()} + </li> + </uL> + """ + + class SetOperation(LogicalPlan): def __init__( self, diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py b/python/pyspark/sql/connect/proto/relations_pb2.py index ebb2682c619..fc70cdea402 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.py +++ b/python/pyspark/sql/connect/proto/relations_pb2.py @@ -35,7 +35,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as spark_dot_connect_dot_catal DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xe1\x18\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...] + b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x9a\x19\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...] ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -62,141 +62,143 @@ if _descriptor._USE_C_DESCRIPTORS == False: _PARSE_OPTIONSENTRY._options = None _PARSE_OPTIONSENTRY._serialized_options = b"8\001" _RELATION._serialized_start = 165 - _RELATION._serialized_end = 3334 - _UNKNOWN._serialized_start = 3336 - _UNKNOWN._serialized_end = 3345 - _RELATIONCOMMON._serialized_start = 3347 - _RELATIONCOMMON._serialized_end = 3438 - _SQL._serialized_start = 3441 - _SQL._serialized_end = 3919 - _SQL_ARGSENTRY._serialized_start = 3735 - _SQL_ARGSENTRY._serialized_end = 3825 - _SQL_NAMEDARGUMENTSENTRY._serialized_start = 3827 - _SQL_NAMEDARGUMENTSENTRY._serialized_end = 3919 - _READ._serialized_start = 3922 - _READ._serialized_end = 4585 - _READ_NAMEDTABLE._serialized_start = 4100 - _READ_NAMEDTABLE._serialized_end = 4292 - _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 4234 - _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 4292 - _READ_DATASOURCE._serialized_start = 4295 - _READ_DATASOURCE._serialized_end = 4572 - _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4234 - _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4292 - _PROJECT._serialized_start = 4587 - _PROJECT._serialized_end = 4704 - _FILTER._serialized_start = 4706 - _FILTER._serialized_end = 4818 - _JOIN._serialized_start = 4821 - _JOIN._serialized_end = 5482 - _JOIN_JOINDATATYPE._serialized_start = 5160 - _JOIN_JOINDATATYPE._serialized_end = 5252 - _JOIN_JOINTYPE._serialized_start = 5255 - _JOIN_JOINTYPE._serialized_end = 5463 - _SETOPERATION._serialized_start = 5485 - _SETOPERATION._serialized_end = 5964 - _SETOPERATION_SETOPTYPE._serialized_start = 5801 - _SETOPERATION_SETOPTYPE._serialized_end = 5915 - _LIMIT._serialized_start = 5966 - _LIMIT._serialized_end = 6042 - _OFFSET._serialized_start = 6044 - _OFFSET._serialized_end = 6123 - _TAIL._serialized_start = 6125 - _TAIL._serialized_end = 6200 - _AGGREGATE._serialized_start = 6203 - _AGGREGATE._serialized_end = 6785 - _AGGREGATE_PIVOT._serialized_start = 6542 - _AGGREGATE_PIVOT._serialized_end = 6653 - _AGGREGATE_GROUPTYPE._serialized_start = 6656 - _AGGREGATE_GROUPTYPE._serialized_end = 6785 - _SORT._serialized_start = 6788 - _SORT._serialized_end = 6948 - _DROP._serialized_start = 6951 - _DROP._serialized_end = 7092 - _DEDUPLICATE._serialized_start = 7095 - _DEDUPLICATE._serialized_end = 7335 - _LOCALRELATION._serialized_start = 7337 - _LOCALRELATION._serialized_end = 7426 - _CACHEDLOCALRELATION._serialized_start = 7428 - _CACHEDLOCALRELATION._serialized_end = 7500 - _CACHEDREMOTERELATION._serialized_start = 7502 - _CACHEDREMOTERELATION._serialized_end = 7557 - _SAMPLE._serialized_start = 7560 - _SAMPLE._serialized_end = 7833 - _RANGE._serialized_start = 7836 - _RANGE._serialized_end = 7981 - _SUBQUERYALIAS._serialized_start = 7983 - _SUBQUERYALIAS._serialized_end = 8097 - _REPARTITION._serialized_start = 8100 - _REPARTITION._serialized_end = 8242 - _SHOWSTRING._serialized_start = 8245 - _SHOWSTRING._serialized_end = 8387 - _HTMLSTRING._serialized_start = 8389 - _HTMLSTRING._serialized_end = 8503 - _STATSUMMARY._serialized_start = 8505 - _STATSUMMARY._serialized_end = 8597 - _STATDESCRIBE._serialized_start = 8599 - _STATDESCRIBE._serialized_end = 8680 - _STATCROSSTAB._serialized_start = 8682 - _STATCROSSTAB._serialized_end = 8783 - _STATCOV._serialized_start = 8785 - _STATCOV._serialized_end = 8881 - _STATCORR._serialized_start = 8884 - _STATCORR._serialized_end = 9021 - _STATAPPROXQUANTILE._serialized_start = 9024 - _STATAPPROXQUANTILE._serialized_end = 9188 - _STATFREQITEMS._serialized_start = 9190 - _STATFREQITEMS._serialized_end = 9315 - _STATSAMPLEBY._serialized_start = 9318 - _STATSAMPLEBY._serialized_end = 9627 - _STATSAMPLEBY_FRACTION._serialized_start = 9519 - _STATSAMPLEBY_FRACTION._serialized_end = 9618 - _NAFILL._serialized_start = 9630 - _NAFILL._serialized_end = 9764 - _NADROP._serialized_start = 9767 - _NADROP._serialized_end = 9901 - _NAREPLACE._serialized_start = 9904 - _NAREPLACE._serialized_end = 10200 - _NAREPLACE_REPLACEMENT._serialized_start = 10059 - _NAREPLACE_REPLACEMENT._serialized_end = 10200 - _TODF._serialized_start = 10202 - _TODF._serialized_end = 10290 - _WITHCOLUMNSRENAMED._serialized_start = 10293 - _WITHCOLUMNSRENAMED._serialized_end = 10532 - _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 10465 - _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 10532 - _WITHCOLUMNS._serialized_start = 10534 - _WITHCOLUMNS._serialized_end = 10653 - _WITHWATERMARK._serialized_start = 10656 - _WITHWATERMARK._serialized_end = 10790 - _HINT._serialized_start = 10793 - _HINT._serialized_end = 10925 - _UNPIVOT._serialized_start = 10928 - _UNPIVOT._serialized_end = 11255 - _UNPIVOT_VALUES._serialized_start = 11185 - _UNPIVOT_VALUES._serialized_end = 11244 - _TOSCHEMA._serialized_start = 11257 - _TOSCHEMA._serialized_end = 11363 - _REPARTITIONBYEXPRESSION._serialized_start = 11366 - _REPARTITIONBYEXPRESSION._serialized_end = 11569 - _MAPPARTITIONS._serialized_start = 11572 - _MAPPARTITIONS._serialized_end = 11753 - _GROUPMAP._serialized_start = 11756 - _GROUPMAP._serialized_end = 12391 - _COGROUPMAP._serialized_start = 12394 - _COGROUPMAP._serialized_end = 12920 - _APPLYINPANDASWITHSTATE._serialized_start = 12923 - _APPLYINPANDASWITHSTATE._serialized_end = 13280 - _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_start = 13283 - _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_end = 13527 - _PYTHONUDTF._serialized_start = 13530 - _PYTHONUDTF._serialized_end = 13707 - _COLLECTMETRICS._serialized_start = 13710 - _COLLECTMETRICS._serialized_end = 13846 - _PARSE._serialized_start = 13849 - _PARSE._serialized_end = 14237 - _PARSE_OPTIONSENTRY._serialized_start = 4234 - _PARSE_OPTIONSENTRY._serialized_end = 4292 - _PARSE_PARSEFORMAT._serialized_start = 14138 - _PARSE_PARSEFORMAT._serialized_end = 14226 + _RELATION._serialized_end = 3391 + _UNKNOWN._serialized_start = 3393 + _UNKNOWN._serialized_end = 3402 + _RELATIONCOMMON._serialized_start = 3404 + _RELATIONCOMMON._serialized_end = 3495 + _SQL._serialized_start = 3498 + _SQL._serialized_end = 3976 + _SQL_ARGSENTRY._serialized_start = 3792 + _SQL_ARGSENTRY._serialized_end = 3882 + _SQL_NAMEDARGUMENTSENTRY._serialized_start = 3884 + _SQL_NAMEDARGUMENTSENTRY._serialized_end = 3976 + _READ._serialized_start = 3979 + _READ._serialized_end = 4642 + _READ_NAMEDTABLE._serialized_start = 4157 + _READ_NAMEDTABLE._serialized_end = 4349 + _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 4291 + _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 4349 + _READ_DATASOURCE._serialized_start = 4352 + _READ_DATASOURCE._serialized_end = 4629 + _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4291 + _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4349 + _PROJECT._serialized_start = 4644 + _PROJECT._serialized_end = 4761 + _FILTER._serialized_start = 4763 + _FILTER._serialized_end = 4875 + _JOIN._serialized_start = 4878 + _JOIN._serialized_end = 5539 + _JOIN_JOINDATATYPE._serialized_start = 5217 + _JOIN_JOINDATATYPE._serialized_end = 5309 + _JOIN_JOINTYPE._serialized_start = 5312 + _JOIN_JOINTYPE._serialized_end = 5520 + _SETOPERATION._serialized_start = 5542 + _SETOPERATION._serialized_end = 6021 + _SETOPERATION_SETOPTYPE._serialized_start = 5858 + _SETOPERATION_SETOPTYPE._serialized_end = 5972 + _LIMIT._serialized_start = 6023 + _LIMIT._serialized_end = 6099 + _OFFSET._serialized_start = 6101 + _OFFSET._serialized_end = 6180 + _TAIL._serialized_start = 6182 + _TAIL._serialized_end = 6257 + _AGGREGATE._serialized_start = 6260 + _AGGREGATE._serialized_end = 6842 + _AGGREGATE_PIVOT._serialized_start = 6599 + _AGGREGATE_PIVOT._serialized_end = 6710 + _AGGREGATE_GROUPTYPE._serialized_start = 6713 + _AGGREGATE_GROUPTYPE._serialized_end = 6842 + _SORT._serialized_start = 6845 + _SORT._serialized_end = 7005 + _DROP._serialized_start = 7008 + _DROP._serialized_end = 7149 + _DEDUPLICATE._serialized_start = 7152 + _DEDUPLICATE._serialized_end = 7392 + _LOCALRELATION._serialized_start = 7394 + _LOCALRELATION._serialized_end = 7483 + _CACHEDLOCALRELATION._serialized_start = 7485 + _CACHEDLOCALRELATION._serialized_end = 7557 + _CACHEDREMOTERELATION._serialized_start = 7559 + _CACHEDREMOTERELATION._serialized_end = 7614 + _SAMPLE._serialized_start = 7617 + _SAMPLE._serialized_end = 7890 + _RANGE._serialized_start = 7893 + _RANGE._serialized_end = 8038 + _SUBQUERYALIAS._serialized_start = 8040 + _SUBQUERYALIAS._serialized_end = 8154 + _REPARTITION._serialized_start = 8157 + _REPARTITION._serialized_end = 8299 + _SHOWSTRING._serialized_start = 8302 + _SHOWSTRING._serialized_end = 8444 + _HTMLSTRING._serialized_start = 8446 + _HTMLSTRING._serialized_end = 8560 + _STATSUMMARY._serialized_start = 8562 + _STATSUMMARY._serialized_end = 8654 + _STATDESCRIBE._serialized_start = 8656 + _STATDESCRIBE._serialized_end = 8737 + _STATCROSSTAB._serialized_start = 8739 + _STATCROSSTAB._serialized_end = 8840 + _STATCOV._serialized_start = 8842 + _STATCOV._serialized_end = 8938 + _STATCORR._serialized_start = 8941 + _STATCORR._serialized_end = 9078 + _STATAPPROXQUANTILE._serialized_start = 9081 + _STATAPPROXQUANTILE._serialized_end = 9245 + _STATFREQITEMS._serialized_start = 9247 + _STATFREQITEMS._serialized_end = 9372 + _STATSAMPLEBY._serialized_start = 9375 + _STATSAMPLEBY._serialized_end = 9684 + _STATSAMPLEBY_FRACTION._serialized_start = 9576 + _STATSAMPLEBY_FRACTION._serialized_end = 9675 + _NAFILL._serialized_start = 9687 + _NAFILL._serialized_end = 9821 + _NADROP._serialized_start = 9824 + _NADROP._serialized_end = 9958 + _NAREPLACE._serialized_start = 9961 + _NAREPLACE._serialized_end = 10257 + _NAREPLACE_REPLACEMENT._serialized_start = 10116 + _NAREPLACE_REPLACEMENT._serialized_end = 10257 + _TODF._serialized_start = 10259 + _TODF._serialized_end = 10347 + _WITHCOLUMNSRENAMED._serialized_start = 10350 + _WITHCOLUMNSRENAMED._serialized_end = 10589 + _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 10522 + _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 10589 + _WITHCOLUMNS._serialized_start = 10591 + _WITHCOLUMNS._serialized_end = 10710 + _WITHWATERMARK._serialized_start = 10713 + _WITHWATERMARK._serialized_end = 10847 + _HINT._serialized_start = 10850 + _HINT._serialized_end = 10982 + _UNPIVOT._serialized_start = 10985 + _UNPIVOT._serialized_end = 11312 + _UNPIVOT_VALUES._serialized_start = 11242 + _UNPIVOT_VALUES._serialized_end = 11301 + _TOSCHEMA._serialized_start = 11314 + _TOSCHEMA._serialized_end = 11420 + _REPARTITIONBYEXPRESSION._serialized_start = 11423 + _REPARTITIONBYEXPRESSION._serialized_end = 11626 + _MAPPARTITIONS._serialized_start = 11629 + _MAPPARTITIONS._serialized_end = 11810 + _GROUPMAP._serialized_start = 11813 + _GROUPMAP._serialized_end = 12448 + _COGROUPMAP._serialized_start = 12451 + _COGROUPMAP._serialized_end = 12977 + _APPLYINPANDASWITHSTATE._serialized_start = 12980 + _APPLYINPANDASWITHSTATE._serialized_end = 13337 + _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_start = 13340 + _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_end = 13584 + _PYTHONUDTF._serialized_start = 13587 + _PYTHONUDTF._serialized_end = 13764 + _COLLECTMETRICS._serialized_start = 13767 + _COLLECTMETRICS._serialized_end = 13903 + _PARSE._serialized_start = 13906 + _PARSE._serialized_end = 14294 + _PARSE_OPTIONSENTRY._serialized_start = 4291 + _PARSE_OPTIONSENTRY._serialized_end = 4349 + _PARSE_PARSEFORMAT._serialized_start = 14195 + _PARSE_PARSEFORMAT._serialized_end = 14283 + _ASOFJOIN._serialized_start = 14297 + _ASOFJOIN._serialized_end = 14772 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi b/python/pyspark/sql/connect/proto/relations_pb2.pyi index fb4a3661764..5bca4f21b2e 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.pyi +++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi @@ -100,6 +100,7 @@ class Relation(google.protobuf.message.Message): CACHED_LOCAL_RELATION_FIELD_NUMBER: builtins.int CACHED_REMOTE_RELATION_FIELD_NUMBER: builtins.int COMMON_INLINE_USER_DEFINED_TABLE_FUNCTION_FIELD_NUMBER: builtins.int + AS_OF_JOIN_FIELD_NUMBER: builtins.int FILL_NA_FIELD_NUMBER: builtins.int DROP_NA_FIELD_NUMBER: builtins.int REPLACE_FIELD_NUMBER: builtins.int @@ -193,6 +194,8 @@ class Relation(google.protobuf.message.Message): self, ) -> global___CommonInlineUserDefinedTableFunction: ... @property + def as_of_join(self) -> global___AsOfJoin: ... + @property def fill_na(self) -> global___NAFill: """NA functions""" @property @@ -268,6 +271,7 @@ class Relation(google.protobuf.message.Message): cached_remote_relation: global___CachedRemoteRelation | None = ..., common_inline_user_defined_table_function: global___CommonInlineUserDefinedTableFunction | None = ..., + as_of_join: global___AsOfJoin | None = ..., fill_na: global___NAFill | None = ..., drop_na: global___NADrop | None = ..., replace: global___NAReplace | None = ..., @@ -292,6 +296,8 @@ class Relation(google.protobuf.message.Message): b"apply_in_pandas_with_state", "approx_quantile", b"approx_quantile", + "as_of_join", + b"as_of_join", "cached_local_relation", b"cached_local_relation", "cached_remote_relation", @@ -403,6 +409,8 @@ class Relation(google.protobuf.message.Message): b"apply_in_pandas_with_state", "approx_quantile", b"approx_quantile", + "as_of_join", + b"as_of_join", "cached_local_relation", b"cached_local_relation", "cached_remote_relation", @@ -546,6 +554,7 @@ class Relation(google.protobuf.message.Message): "cached_local_relation", "cached_remote_relation", "common_inline_user_defined_table_function", + "as_of_join", "fill_na", "drop_na", "replace", @@ -3672,3 +3681,117 @@ class Parse(google.protobuf.message.Message): ) -> typing_extensions.Literal["schema"] | None: ... global___Parse = Parse + +class AsOfJoin(google.protobuf.message.Message): + """Relation of type [[AsOfJoin]]. + + `left` and `right` must be present. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + LEFT_FIELD_NUMBER: builtins.int + RIGHT_FIELD_NUMBER: builtins.int + LEFT_AS_OF_FIELD_NUMBER: builtins.int + RIGHT_AS_OF_FIELD_NUMBER: builtins.int + JOIN_EXPR_FIELD_NUMBER: builtins.int + USING_COLUMNS_FIELD_NUMBER: builtins.int + JOIN_TYPE_FIELD_NUMBER: builtins.int + TOLERANCE_FIELD_NUMBER: builtins.int + ALLOW_EXACT_MATCHES_FIELD_NUMBER: builtins.int + DIRECTION_FIELD_NUMBER: builtins.int + @property + def left(self) -> global___Relation: + """(Required) Left input relation for a Join.""" + @property + def right(self) -> global___Relation: + """(Required) Right input relation for a Join.""" + @property + def left_as_of(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression: + """(Required) Field to join on in left DataFrame""" + @property + def right_as_of(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression: + """(Required) Field to join on in right DataFrame""" + @property + def join_expr(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression: + """(Optional) The join condition. Could be unset when `using_columns` is utilized. + + This field does not co-exist with using_columns. + """ + @property + def using_columns( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """Optional. using_columns provides a list of columns that should present on both sides of + the join inputs that this Join will join on. For example A JOIN B USING col_name is + equivalent to A JOIN B on A.col_name = B.col_name. + + This field does not co-exist with join_condition. + """ + join_type: builtins.str + """(Required) The join type.""" + @property + def tolerance(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression: + """(Optional) The asof tolerance within this range.""" + allow_exact_matches: builtins.bool + """(Required) Whether allow matching with the same value or not.""" + direction: builtins.str + """(Required) Whether to search for prior, subsequent, or closest matches.""" + def __init__( + self, + *, + left: global___Relation | None = ..., + right: global___Relation | None = ..., + left_as_of: pyspark.sql.connect.proto.expressions_pb2.Expression | None = ..., + right_as_of: pyspark.sql.connect.proto.expressions_pb2.Expression | None = ..., + join_expr: pyspark.sql.connect.proto.expressions_pb2.Expression | None = ..., + using_columns: collections.abc.Iterable[builtins.str] | None = ..., + join_type: builtins.str = ..., + tolerance: pyspark.sql.connect.proto.expressions_pb2.Expression | None = ..., + allow_exact_matches: builtins.bool = ..., + direction: builtins.str = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "join_expr", + b"join_expr", + "left", + b"left", + "left_as_of", + b"left_as_of", + "right", + b"right", + "right_as_of", + b"right_as_of", + "tolerance", + b"tolerance", + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "allow_exact_matches", + b"allow_exact_matches", + "direction", + b"direction", + "join_expr", + b"join_expr", + "join_type", + b"join_type", + "left", + b"left", + "left_as_of", + b"left_as_of", + "right", + b"right", + "right_as_of", + b"right_as_of", + "tolerance", + b"tolerance", + "using_columns", + b"using_columns", + ], + ) -> None: ... + +global___AsOfJoin = AsOfJoin diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index bcdae5e40b9..51f18e7b6f3 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2751,6 +2751,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): This is similar to a left-join except that we match on the nearest key rather than equal keys. + .. versionchanged:: 4.0.0 + Supports Spark Connect. + Parameters ---------- other : :class:`DataFrame` --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org