john-bodley commented on code in PR #28432:
URL: https://github.com/apache/superset/pull/28432#discussion_r1600433534
##########
superset/common/query_context_processor.py:
##########
@@ -339,19 +339,31 @@ def get_time_grain(query_object: QueryObject) -> Any |
None:
return query_object.extras.get("time_grain_sqla")
- def add_aggregated_join_column(
+ # pylint: disable=too-many-arguments
+ def add_offset_join_column(
self,
df: pd.DataFrame,
+ name: str,
time_grain: str,
+ time_offset: str | None = None,
join_column_producer: Any = None,
) -> None:
+ """
+ Adds an offset join column to the provided DataFrame.
+
Review Comment:
```suggestion
The function modifies the DataFrame in-place.
```
##########
superset/common/query_context_processor.py:
##########
@@ -519,51 +510,112 @@ def processing_time_offsets( # pylint:
disable=too-many-locals,too-many-stateme
datasource_uid=query_context.datasource.uid,
region=CacheRegion.DATA,
)
- offset_dfs.append(offset_metrics_df)
+ offset_dfs[offset] = offset_metrics_df
if offset_dfs:
- # iterate on offset_dfs, left join each with df
- for offset_df in offset_dfs:
- df = dataframe_utils.left_join_df(
- left_df=df,
- right_df=offset_df,
- join_keys=join_keys,
- rsuffix=R_SUFFIX,
- )
+ df = self.join_offset_dfs(
+ df,
+ offset_dfs,
+ time_grain,
+ join_keys,
+ )
+
+ return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
+
+ def join_offset_dfs(
+ self,
+ df: pd.DataFrame,
+ offset_dfs: dict[str, pd.DataFrame],
+ time_grain: str,
+ join_keys: list[str],
+ ) -> pd.DataFrame:
+ """
+ Join offset DataFrames with the main DataFrame.
- # removes columns used for join
- df.drop(
- list(df.filter(regex=f"{AGGREGATED_JOIN_COLUMN}|{R_SUFFIX}")),
- axis=1,
- inplace=True,
+ :param df: The main DataFrame.
+ :param offset_dfs: A list of offset DataFrames.
+ :param time_grain: The time grain used to calculate the temporal join
key.
+ :param join_keys: The keys to join on.
+ """
+ join_column_producer = config["TIME_GRAIN_JOIN_COLUMN_PRODUCERS"].get(
+ time_grain
)
- return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
+ # iterate on offset_dfs, left join each with df
+ for offset, offset_df in offset_dfs.items():
+ # defines a column name for the offset join column
+ column_name = OFFSET_JOIN_COLUMN_SUFFIX + offset
+
+ # add offset join column to df
+ self.add_offset_join_column(
+ df, column_name, time_grain, offset, join_column_producer
+ )
+
+ # add artifoffseticial join column to offset_df
Review Comment:
```suggestion
# add offset join column to offset_df
```
##########
superset/common/query_context_processor.py:
##########
@@ -339,19 +339,31 @@ def get_time_grain(query_object: QueryObject) -> Any |
None:
return query_object.extras.get("time_grain_sqla")
- def add_aggregated_join_column(
+ # pylint: disable=too-many-arguments
+ def add_offset_join_column(
self,
df: pd.DataFrame,
+ name: str,
time_grain: str,
+ time_offset: str | None = None,
join_column_producer: Any = None,
) -> None:
+ """
+ Adds an offset join column to the provided DataFrame.
+
+ :param df: pandas DataFrame to which the offset join column will be
added.
+ :param name: The name of the new column to be added.
+ :param time_grain: The time grain used to calculate the new column.
+ :param time_offset: The time offset used to calculate the new column.
+ :param join_column_producer: A function to generate the join column.
+
+ :return: None. The function modifies the DataFrame in-place.
Review Comment:
```suggestion
```
I don't think we need to include a `return:` docstring for `None`. That said
knowing this is in-place is helpful, but that statement could be included in
the main docstring.
##########
tests/unit_tests/common/test_time_shifts.py:
##########
@@ -0,0 +1,187 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from pandas import DataFrame, Series, Timestamp
+from pandas.testing import assert_frame_equal
+from pytest import fixture, mark
+
+from superset.common.chart_data import ChartDataResultFormat,
ChartDataResultType
+from superset.common.query_context import QueryContext
+from superset.common.query_context_processor import QueryContextProcessor
+from superset.connectors.sqla.models import BaseDatasource
+from superset.constants import TimeGrain
+
+query_context_processor = QueryContextProcessor(
+ QueryContext(
+ datasource=BaseDatasource(),
+ queries=[],
+ result_type=ChartDataResultType.COLUMNS,
+ form_data={},
+ slice_=None,
+ result_format=ChartDataResultFormat.CSV,
+ cache_values={},
+ )
+)
+
+
+@fixture
+def make_join_column_producer():
+ def join_column_producer(row: Series, column_index: int) -> str:
+ return "CUSTOM_FORMAT"
+
+ return join_column_producer
+
+
[email protected](
+ ("time_grain", "expected"),
+ [
+ (TimeGrain.WEEK, "2020-W01"),
+ (TimeGrain.MONTH, "2020-01"),
+ (TimeGrain.QUARTER, "2020-Q1"),
+ (TimeGrain.YEAR, "2020"),
+ ],
+)
+def test_join_column(time_grain: str, expected: str):
+ df = DataFrame({"ds": [Timestamp("2020-01-07")]})
+ column_name = "join_column"
+ query_context_processor.add_offset_join_column(df, column_name, time_grain)
+ result = DataFrame({"ds": [Timestamp("2020-01-07")], column_name:
[expected]})
+ assert_frame_equal(df, result)
+
+
+def test_join_column_producer(make_join_column_producer):
+ df = DataFrame({"ds": [Timestamp("2020-01-07")]})
+ column_name = "join_column"
+ query_context_processor.add_offset_join_column(
+ df, column_name, TimeGrain.YEAR, None, make_join_column_producer
+ )
+ result = DataFrame(
+ {"ds": [Timestamp("2020-01-07")], column_name: ["CUSTOM_FORMAT"]}
+ )
+ assert_frame_equal(df, result)
+
+
+def test_join_offset_dfs_no_offsets():
+ df = DataFrame({"A": ["2021-01-01", "2021-02-01", "2021-03-01"]})
+ offset_dfs = {}
+ time_grain = "YEAR"
+ join_keys = ["A"]
+
+ result = query_context_processor.join_offset_dfs(
+ df, offset_dfs, time_grain, join_keys
+ )
+
+ assert_frame_equal(df, result)
+
+
+def test_join_offset_dfs_with_offsets():
+ df = DataFrame({"A": ["2021-01-01", "2021-02-01", "2021-03-01"]})
Review Comment:
Are any of these dataframes reused? If so I wonder if they should be made
into fixtures.
##########
superset/common/query_context_processor.py:
##########
@@ -519,51 +510,112 @@ def processing_time_offsets( # pylint:
disable=too-many-locals,too-many-stateme
datasource_uid=query_context.datasource.uid,
region=CacheRegion.DATA,
)
- offset_dfs.append(offset_metrics_df)
+ offset_dfs[offset] = offset_metrics_df
if offset_dfs:
- # iterate on offset_dfs, left join each with df
- for offset_df in offset_dfs:
- df = dataframe_utils.left_join_df(
- left_df=df,
- right_df=offset_df,
- join_keys=join_keys,
- rsuffix=R_SUFFIX,
- )
+ df = self.join_offset_dfs(
+ df,
+ offset_dfs,
+ time_grain,
+ join_keys,
+ )
+
+ return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
+
+ def join_offset_dfs(
+ self,
+ df: pd.DataFrame,
+ offset_dfs: dict[str, pd.DataFrame],
+ time_grain: str,
+ join_keys: list[str],
+ ) -> pd.DataFrame:
+ """
+ Join offset DataFrames with the main DataFrame.
- # removes columns used for join
- df.drop(
- list(df.filter(regex=f"{AGGREGATED_JOIN_COLUMN}|{R_SUFFIX}")),
- axis=1,
- inplace=True,
+ :param df: The main DataFrame.
+ :param offset_dfs: A list of offset DataFrames.
+ :param time_grain: The time grain used to calculate the temporal join
key.
+ :param join_keys: The keys to join on.
+ """
+ join_column_producer = config["TIME_GRAIN_JOIN_COLUMN_PRODUCERS"].get(
+ time_grain
)
- return CachedTimeOffset(df=df, queries=queries, cache_keys=cache_keys)
+ # iterate on offset_dfs, left join each with df
+ for offset, offset_df in offset_dfs.items():
+ # defines a column name for the offset join column
+ column_name = OFFSET_JOIN_COLUMN_SUFFIX + offset
+
+ # add offset join column to df
+ self.add_offset_join_column(
+ df, column_name, time_grain, offset, join_column_producer
+ )
+
+ # add artifoffseticial join column to offset_df
+ self.add_offset_join_column(
+ offset_df, column_name, time_grain, None, join_column_producer
+ )
+
+ # the temporal column is the first column in the join keys
+ # so we use the join column instead of the temporal column
+ actual_join_keys = [column_name, *join_keys[1:]]
+
+ # left join df with offset_df
+ df = dataframe_utils.left_join_df(
+ left_df=df,
+ right_df=offset_df,
+ join_keys=actual_join_keys,
+ rsuffix=R_SUFFIX,
+ )
+
+ # move the temporal column to the first column in df
+ col = df.pop(join_keys[0])
Review Comment:
Is this guaranteed to exist?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]