gabotorresruiz commented on code in PR #35832:
URL: https://github.com/apache/superset/pull/35832#discussion_r2543884035
##########
superset/tasks/slack.py:
##########
@@ -15,29 +15,128 @@
# specific language governing permissions and limitations
# under the License.
import logging
+from typing import Optional
from flask import current_app
-from superset.extensions import celery_app
-from superset.utils.slack import get_channels
+from superset.extensions import cache_manager, celery_app
+from superset.utils.slack import (
+ get_channels_with_search,
+ SLACK_CHANNELS_CACHE_KEY,
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY,
+ SlackChannelTypes,
+)
logger = logging.getLogger(__name__)
-@celery_app.task(name="slack.cache_channels")
+@celery_app.task(
+ name="slack.cache_channels",
+ time_limit=300, # 5 minute hard timeout (via SLACK_CACHE_WARMUP_TIMEOUT)
+ soft_time_limit=240, # 4 minute warning
+)
def cache_channels() -> None:
+ """
+ Celery task to warm up the Slack channels cache.
+
+ This task fetches all Slack channels using pagination and stores them in
cache.
+ Includes safeguards for very large workspaces (50k+ channels).
+
+ Respects the following config variables:
+ - SLACK_ENABLE_CACHING: If False, this task does nothing
+ - SLACK_CACHE_MAX_CHANNELS: Maximum channels to cache (prevents runaway
fetches)
+ - SLACK_CACHE_WARMUP_TIMEOUT: Task timeout in seconds
+ - SLACK_CACHE_TIMEOUT: How long to keep cached data
+ """
+ enable_caching = current_app.config.get("SLACK_ENABLE_CACHING", True)
+ if not enable_caching:
+ logger.info(
+ "Slack caching disabled (SLACK_ENABLE_CACHING=False), skipping
cache warmup"
+ )
+ return
+
cache_timeout = current_app.config["SLACK_CACHE_TIMEOUT"]
+ max_channels = current_app.config.get("SLACK_CACHE_MAX_CHANNELS", 20000)
retry_count = current_app.config.get("SLACK_API_RATE_LIMIT_RETRY_COUNT", 2)
logger.info(
"Starting Slack channels cache warm-up task "
- "(cache_timeout=%ds, retry_count=%d)",
+ "(max_channels=%d, cache_timeout=%ds, retry_count=%d)",
+ max_channels,
cache_timeout,
retry_count,
)
try:
- get_channels(force=True, cache_timeout=cache_timeout)
+ all_channels = []
+ cursor: Optional[str] = None
+ page_count = 0
+
+ while True:
+ page_count += 1
+
+ result = get_channels_with_search(
+ search_string="",
+ types=list(SlackChannelTypes),
+ cursor=cursor,
+ limit=1000,
+ )
+
+ page_channels = result["result"]
+ all_channels.extend(page_channels)
+
+ logger.debug(
+ "Fetched page %d: %d channels (total: %d)",
+ page_count,
+ len(page_channels),
+ len(all_channels),
+ )
+
+ cursor = result.get("next_cursor")
+
+ # Safety check: stop if we hit the max channel limit
+ if len(all_channels) >= max_channels:
+ # Store the continuation cursor so we can resume via API later
+ if cursor:
+ cache_manager.cache.set(
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY,
+ cursor,
+ timeout=cache_timeout,
+ )
+ logger.warning(
+ "Reached max channel limit (%d channels in %d pages). "
+ "Stored continuation cursor for API fallback. "
+ "Channels beyond limit will be fetched from API on
demand.",
+ len(all_channels),
+ page_count,
+ )
+ else:
+ logger.warning(
+ "Reached max channel limit (%d channels in %d pages). "
+ "No more channels available.",
+ len(all_channels),
+ page_count,
+ )
Review Comment:
You're right! If there's no cursor, it means we successfully fetched all
channels, the workspace just happened to have exactly `max_channels` or fewer.
That's success, not a warning.
##########
superset/utils/slack.py:
##########
@@ -59,80 +62,277 @@ def get_slack_client() -> WebClient:
return client
-@cache_util.memoized_func(
- key="slack_conversations_list",
- cache=cache_manager.cache,
-)
-def get_channels() -> list[SlackChannelSchema]:
- """
- Retrieves a list of all conversations accessible by the bot
- from the Slack API, and caches results (to avoid rate limits).
-
- The Slack API does not provide search so to apply a search use
- get_channels_with_search instead.
- """
- client = get_slack_client()
- channel_schema = SlackChannelSchema()
+def _fetch_channels_without_search(
+ client: WebClient,
+ channel_schema: SlackChannelSchema,
+ types_param: str,
+ cursor: Optional[str],
+ limit: int,
+) -> dict[str, Any]:
+ """Fetch channels without search filtering, paginating for large limits."""
channels: list[SlackChannelSchema] = []
- extra_params = {"types": ",".join(SlackChannelTypes)}
- cursor = None
- page_count = 0
+ slack_cursor = cursor
+ page_size = min(limit, 1000)
Review Comment:
Your understanding of the flow is correct! I've updated
`_fetch_channels_without_search` to always fetch 999 from Slack channels per
call, consistent with `_fetch_channels_with_search`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]