rusackas commented on code in PR #40663:
URL: https://github.com/apache/superset/pull/40663#discussion_r3338260896


##########
superset/key_value/commands/prune.py:
##########
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import time
+from datetime import datetime
+
+import sqlalchemy as sa
+
+from superset import db
+from superset.commands.base import BaseCommand
+from superset.key_value.models import KeyValueEntry
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class KeyValuePruneCommand(BaseCommand):
+    """
+    Command to prune the key-value store by deleting entries whose expiry has
+    already passed.
+
+    The metastore-backed key-value store keeps an `expires_on` timestamp for
+    entries written with a timeout (for example, the metastore cache backend).
+    Unlike cache backends that evict on read, the metastore does not remove 
rows
+    on its own, so expired entries remain in the table until something deletes
+    them. This command performs that housekeeping by deleting every entry whose
+    `expires_on` is in the past, freeing up space in the table.
+
+    Attributes:
+        max_rows_per_run (int | None): The maximum number of rows to delete in 
a
+                                       single run. If provided and greater than
+                                       zero, rows are selected 
deterministically
+                                       from the oldest first by id up to this
+                                       limit in this execution.
+    """
+
+    def __init__(self, max_rows_per_run: int | None = None):
+        """
+        :param max_rows_per_run: The maximum number of rows to delete in a 
single run.
+            If provided and greater than zero, rows are selected 
deterministically from
+            the oldest first by id up to this limit in this execution.
+        """
+        self.max_rows_per_run = max_rows_per_run
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs whose expiry has already passed. Entries without an
+        # expiry (expires_on IS NULL) never expire and are left untouched.
+        select_stmt = sa.select(KeyValueEntry.id).where(
+            KeyValueEntry.expires_on.isnot(None),
+            KeyValueEntry.expires_on < datetime.now(),
+        )

Review Comment:
   The strict less-than is intentional: an entry expiring at exactly the 
current instant should arguably still be considered live (the expiry moment 
itself is the boundary). Using < is also safer — it avoids accidentally 
deleting entries in a heavily-concurrent system where timestamps align. This is 
consistent with how cache reads work (reads check < expiry too). We can revisit 
if observed behavior causes issues.



##########
superset/key_value/commands/prune.py:
##########
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import time
+from datetime import datetime
+
+import sqlalchemy as sa
+
+from superset import db
+from superset.commands.base import BaseCommand
+from superset.key_value.models import KeyValueEntry
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class KeyValuePruneCommand(BaseCommand):
+    """
+    Command to prune the key-value store by deleting entries whose expiry has
+    already passed.
+
+    The metastore-backed key-value store keeps an `expires_on` timestamp for
+    entries written with a timeout (for example, the metastore cache backend).
+    Unlike cache backends that evict on read, the metastore does not remove 
rows
+    on its own, so expired entries remain in the table until something deletes
+    them. This command performs that housekeeping by deleting every entry whose
+    `expires_on` is in the past, freeing up space in the table.
+
+    Attributes:
+        max_rows_per_run (int | None): The maximum number of rows to delete in 
a
+                                       single run. If provided and greater than
+                                       zero, rows are selected 
deterministically
+                                       from the oldest first by id up to this
+                                       limit in this execution.
+    """
+
+    def __init__(self, max_rows_per_run: int | None = None):
+        """
+        :param max_rows_per_run: The maximum number of rows to delete in a 
single run.
+            If provided and greater than zero, rows are selected 
deterministically from
+            the oldest first by id up to this limit in this execution.
+        """
+        self.max_rows_per_run = max_rows_per_run
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs whose expiry has already passed. Entries without an
+        # expiry (expires_on IS NULL) never expire and are left untouched.
+        select_stmt = sa.select(KeyValueEntry.id).where(
+            KeyValueEntry.expires_on.isnot(None),
+            KeyValueEntry.expires_on < datetime.now(),
+        )
+
+        # Optionally limited by max_rows_per_run
+        # order by oldest first for deterministic deletion
+        if self.max_rows_per_run is not None and self.max_rows_per_run > 0:
+            select_stmt = select_stmt.order_by(KeyValueEntry.id.asc()).limit(
+                self.max_rows_per_run
+            )
+
+        ids_to_delete = db.session.execute(select_stmt).scalars().all()
+
+        total_rows = len(ids_to_delete)

Review Comment:
   Valid concern for very large tables. The command already chunks deletes into 
SQLite-safe batches of 999 and supports a max_rows_per_run cap to bound the 
selection. For the typical metastore-cache use case the expired set will be 
small. A true streaming/cursor approach would be safer for large-scale 
deployments, but adds complexity. For now, operators concerned about memory can 
set max_rows_per_run appropriately.



##########
superset/key_value/commands/prune.py:
##########
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import time
+from datetime import datetime
+
+import sqlalchemy as sa
+
+from superset import db
+from superset.commands.base import BaseCommand
+from superset.key_value.models import KeyValueEntry
+
+logger = logging.getLogger(__name__)
+
+
+# pylint: disable=consider-using-transaction
+class KeyValuePruneCommand(BaseCommand):
+    """
+    Command to prune the key-value store by deleting entries whose expiry has
+    already passed.
+
+    The metastore-backed key-value store keeps an `expires_on` timestamp for
+    entries written with a timeout (for example, the metastore cache backend).
+    Unlike cache backends that evict on read, the metastore does not remove 
rows
+    on its own, so expired entries remain in the table until something deletes
+    them. This command performs that housekeeping by deleting every entry whose
+    `expires_on` is in the past, freeing up space in the table.
+
+    Attributes:
+        max_rows_per_run (int | None): The maximum number of rows to delete in 
a
+                                       single run. If provided and greater than
+                                       zero, rows are selected 
deterministically
+                                       from the oldest first by id up to this
+                                       limit in this execution.
+    """
+
+    def __init__(self, max_rows_per_run: int | None = None):
+        """
+        :param max_rows_per_run: The maximum number of rows to delete in a 
single run.
+            If provided and greater than zero, rows are selected 
deterministically from
+            the oldest first by id up to this limit in this execution.
+        """
+        self.max_rows_per_run = max_rows_per_run
+
+    def run(self) -> None:
+        """
+        Executes the prune command
+        """
+        batch_size = 999  # SQLite has a IN clause limit of 999
+        total_deleted = 0
+        start_time = time.time()
+
+        # Select all IDs whose expiry has already passed. Entries without an
+        # expiry (expires_on IS NULL) never expire and are left untouched.
+        select_stmt = sa.select(KeyValueEntry.id).where(
+            KeyValueEntry.expires_on.isnot(None),
+            KeyValueEntry.expires_on < datetime.now(),
+        )
+
+        # Optionally limited by max_rows_per_run
+        # order by oldest first for deterministic deletion
+        if self.max_rows_per_run is not None and self.max_rows_per_run > 0:
+            select_stmt = select_stmt.order_by(KeyValueEntry.id.asc()).limit(
+                self.max_rows_per_run
+            )
+
+        ids_to_delete = db.session.execute(select_stmt).scalars().all()
+
+        total_rows = len(ids_to_delete)
+
+        logger.info("Total rows to be deleted: %s", f"{total_rows:,}")
+
+        next_logging_threshold = 1
+
+        # Iterate over the IDs in batches
+        for i in range(0, total_rows, batch_size):
+            batch_ids = ids_to_delete[i : i + batch_size]
+
+            # Delete the selected batch using IN clause
+            result = db.session.execute(
+                sa.delete(KeyValueEntry).where(KeyValueEntry.id.in_(batch_ids))
+            )

Review Comment:
   Acknowledged. The select-then-delete pattern has an inherent TOCTOU window. 
Adding the expiry predicate to the DELETE WHERE clause is the clean fix. 
However, for this housekeeping task the impact of deleting a just-refreshed 
entry is low — the next read will re-populate it. Adding the predicate to the 
delete adds safety and is worth doing in a follow-up, but doesn't block this 
initial implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to