rusackas commented on code in PR #40663: URL: https://github.com/apache/superset/pull/40663#discussion_r3338260896
########## superset/key_value/commands/prune.py: ########## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import time +from datetime import datetime + +import sqlalchemy as sa + +from superset import db +from superset.commands.base import BaseCommand +from superset.key_value.models import KeyValueEntry + +logger = logging.getLogger(__name__) + + +# pylint: disable=consider-using-transaction +class KeyValuePruneCommand(BaseCommand): + """ + Command to prune the key-value store by deleting entries whose expiry has + already passed. + + The metastore-backed key-value store keeps an `expires_on` timestamp for + entries written with a timeout (for example, the metastore cache backend). + Unlike cache backends that evict on read, the metastore does not remove rows + on its own, so expired entries remain in the table until something deletes + them. This command performs that housekeeping by deleting every entry whose + `expires_on` is in the past, freeing up space in the table. + + Attributes: + max_rows_per_run (int | None): The maximum number of rows to delete in a + single run. If provided and greater than + zero, rows are selected deterministically + from the oldest first by id up to this + limit in this execution. + """ + + def __init__(self, max_rows_per_run: int | None = None): + """ + :param max_rows_per_run: The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected deterministically from + the oldest first by id up to this limit in this execution. + """ + self.max_rows_per_run = max_rows_per_run + + def run(self) -> None: + """ + Executes the prune command + """ + batch_size = 999 # SQLite has a IN clause limit of 999 + total_deleted = 0 + start_time = time.time() + + # Select all IDs whose expiry has already passed. Entries without an + # expiry (expires_on IS NULL) never expire and are left untouched. + select_stmt = sa.select(KeyValueEntry.id).where( + KeyValueEntry.expires_on.isnot(None), + KeyValueEntry.expires_on < datetime.now(), + ) Review Comment: The strict less-than is intentional: an entry expiring at exactly the current instant should arguably still be considered live (the expiry moment itself is the boundary). Using < is also safer — it avoids accidentally deleting entries in a heavily-concurrent system where timestamps align. This is consistent with how cache reads work (reads check < expiry too). We can revisit if observed behavior causes issues. ########## superset/key_value/commands/prune.py: ########## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import time +from datetime import datetime + +import sqlalchemy as sa + +from superset import db +from superset.commands.base import BaseCommand +from superset.key_value.models import KeyValueEntry + +logger = logging.getLogger(__name__) + + +# pylint: disable=consider-using-transaction +class KeyValuePruneCommand(BaseCommand): + """ + Command to prune the key-value store by deleting entries whose expiry has + already passed. + + The metastore-backed key-value store keeps an `expires_on` timestamp for + entries written with a timeout (for example, the metastore cache backend). + Unlike cache backends that evict on read, the metastore does not remove rows + on its own, so expired entries remain in the table until something deletes + them. This command performs that housekeeping by deleting every entry whose + `expires_on` is in the past, freeing up space in the table. + + Attributes: + max_rows_per_run (int | None): The maximum number of rows to delete in a + single run. If provided and greater than + zero, rows are selected deterministically + from the oldest first by id up to this + limit in this execution. + """ + + def __init__(self, max_rows_per_run: int | None = None): + """ + :param max_rows_per_run: The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected deterministically from + the oldest first by id up to this limit in this execution. + """ + self.max_rows_per_run = max_rows_per_run + + def run(self) -> None: + """ + Executes the prune command + """ + batch_size = 999 # SQLite has a IN clause limit of 999 + total_deleted = 0 + start_time = time.time() + + # Select all IDs whose expiry has already passed. Entries without an + # expiry (expires_on IS NULL) never expire and are left untouched. + select_stmt = sa.select(KeyValueEntry.id).where( + KeyValueEntry.expires_on.isnot(None), + KeyValueEntry.expires_on < datetime.now(), + ) + + # Optionally limited by max_rows_per_run + # order by oldest first for deterministic deletion + if self.max_rows_per_run is not None and self.max_rows_per_run > 0: + select_stmt = select_stmt.order_by(KeyValueEntry.id.asc()).limit( + self.max_rows_per_run + ) + + ids_to_delete = db.session.execute(select_stmt).scalars().all() + + total_rows = len(ids_to_delete) Review Comment: Valid concern for very large tables. The command already chunks deletes into SQLite-safe batches of 999 and supports a max_rows_per_run cap to bound the selection. For the typical metastore-cache use case the expired set will be small. A true streaming/cursor approach would be safer for large-scale deployments, but adds complexity. For now, operators concerned about memory can set max_rows_per_run appropriately. ########## superset/key_value/commands/prune.py: ########## @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +import time +from datetime import datetime + +import sqlalchemy as sa + +from superset import db +from superset.commands.base import BaseCommand +from superset.key_value.models import KeyValueEntry + +logger = logging.getLogger(__name__) + + +# pylint: disable=consider-using-transaction +class KeyValuePruneCommand(BaseCommand): + """ + Command to prune the key-value store by deleting entries whose expiry has + already passed. + + The metastore-backed key-value store keeps an `expires_on` timestamp for + entries written with a timeout (for example, the metastore cache backend). + Unlike cache backends that evict on read, the metastore does not remove rows + on its own, so expired entries remain in the table until something deletes + them. This command performs that housekeeping by deleting every entry whose + `expires_on` is in the past, freeing up space in the table. + + Attributes: + max_rows_per_run (int | None): The maximum number of rows to delete in a + single run. If provided and greater than + zero, rows are selected deterministically + from the oldest first by id up to this + limit in this execution. + """ + + def __init__(self, max_rows_per_run: int | None = None): + """ + :param max_rows_per_run: The maximum number of rows to delete in a single run. + If provided and greater than zero, rows are selected deterministically from + the oldest first by id up to this limit in this execution. + """ + self.max_rows_per_run = max_rows_per_run + + def run(self) -> None: + """ + Executes the prune command + """ + batch_size = 999 # SQLite has a IN clause limit of 999 + total_deleted = 0 + start_time = time.time() + + # Select all IDs whose expiry has already passed. Entries without an + # expiry (expires_on IS NULL) never expire and are left untouched. + select_stmt = sa.select(KeyValueEntry.id).where( + KeyValueEntry.expires_on.isnot(None), + KeyValueEntry.expires_on < datetime.now(), + ) + + # Optionally limited by max_rows_per_run + # order by oldest first for deterministic deletion + if self.max_rows_per_run is not None and self.max_rows_per_run > 0: + select_stmt = select_stmt.order_by(KeyValueEntry.id.asc()).limit( + self.max_rows_per_run + ) + + ids_to_delete = db.session.execute(select_stmt).scalars().all() + + total_rows = len(ids_to_delete) + + logger.info("Total rows to be deleted: %s", f"{total_rows:,}") + + next_logging_threshold = 1 + + # Iterate over the IDs in batches + for i in range(0, total_rows, batch_size): + batch_ids = ids_to_delete[i : i + batch_size] + + # Delete the selected batch using IN clause + result = db.session.execute( + sa.delete(KeyValueEntry).where(KeyValueEntry.id.in_(batch_ids)) + ) Review Comment: Acknowledged. The select-then-delete pattern has an inherent TOCTOU window. Adding the expiry predicate to the DELETE WHERE clause is the clean fix. However, for this housekeeping task the impact of deleting a just-refreshed entry is low — the next read will re-populate it. Adding the predicate to the delete adds safety and is worth doing in a follow-up, but doesn't block this initial implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
