Vitor-Avila commented on code in PR #35832:
URL: https://github.com/apache/superset/pull/35832#discussion_r2548499255
##########
superset/reports/api.py:
##########
@@ -577,14 +582,31 @@ def slack_channels(self, **kwargs: Any) -> Response:
search_string = params.get("search_string")
types = params.get("types", [])
exact_match = params.get("exact_match", False)
+ cursor = params.get("cursor")
+ limit = params.get("limit", 100)
force = params.get("force", False)
- channels = get_channels_with_search(
+
+ # Clear cache if force refresh requested
+ if force:
+ cache_manager.cache.delete(SLACK_CHANNELS_CACHE_KEY)
+ cache_manager.cache.delete(
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY
+ )
+ logger.info("Slack channels cache cleared due to force=True")
+
+ # Trigger async cache warmup if caching is enabled
+ if current_app.config.get("SLACK_ENABLE_CACHING", True):
+ cache_channels.delay()
+ logger.info("Triggered async cache warmup task")
+
+ channels_data = get_channels_with_search(
search_string=search_string,
types=types,
exact_match=exact_match,
- force=force,
+ cursor=cursor,
+ limit=limit,
)
Review Comment:
Would it make sense to have the `force` button to show a toast that the
channel list is being updated and prevent the user from searching until the
cache is warmed up? @eschutho what do you think?
##########
superset/utils/slack.py:
##########
@@ -59,80 +62,275 @@ def get_slack_client() -> WebClient:
return client
-@cache_util.memoized_func(
- key="slack_conversations_list",
- cache=cache_manager.cache,
-)
-def get_channels() -> list[SlackChannelSchema]:
- """
- Retrieves a list of all conversations accessible by the bot
- from the Slack API, and caches results (to avoid rate limits).
-
- The Slack API does not provide search so to apply a search use
- get_channels_with_search instead.
- """
- client = get_slack_client()
- channel_schema = SlackChannelSchema()
+def _fetch_channels_without_search(
+ client: WebClient,
+ channel_schema: SlackChannelSchema,
+ types_param: str,
+ cursor: Optional[str],
+ limit: int,
+) -> dict[str, Any]:
+ """Fetch channels without search filtering, paginating for large limits."""
channels: list[SlackChannelSchema] = []
- extra_params = {"types": ",".join(SlackChannelTypes)}
- cursor = None
- page_count = 0
+ slack_cursor = cursor
+
+ while True:
+ response = client.conversations_list(
+ limit=999,
+ cursor=slack_cursor,
+ exclude_archived=True,
+ types=types_param,
+ )
+
+ page_channels = [
+ channel_schema.load(channel) for channel in
response.data["channels"]
+ ]
+ channels.extend(page_channels)
+
+ slack_cursor = response.data.get("response_metadata",
{}).get("next_cursor")
+
+ if not slack_cursor or len(channels) >= limit:
+ break
+
+ return {
+ "result": channels[:limit],
+ "next_cursor": slack_cursor,
+ "has_more": bool(slack_cursor),
+ }
+
+
+def _fetch_channels_with_search(
+ client: WebClient,
+ channel_schema: SlackChannelSchema,
+ types_param: str,
+ search_string: str,
+ exact_match: bool,
+ cursor: Optional[str],
+ limit: int,
+) -> dict[str, Any]:
+ """Fetch channels with search filtering, streaming through pages."""
+ matches: list[SlackChannelSchema] = []
+ slack_cursor = cursor
+ search_terms = [
+ term.strip().lower() for term in search_string.split(",") if
term.strip()
+ ]
+
+ while len(matches) < limit:
+ response = client.conversations_list(
+ limit=999,
+ cursor=slack_cursor,
+ exclude_archived=True,
+ types=types_param,
+ )
- logger.info("Starting Slack channels fetch")
+ for channel_data in response.data["channels"]:
+ channel_name_lower = channel_data["name"].lower()
+ channel_id_lower = channel_data["id"].lower()
+
+ is_match = False
+ for search_term in search_terms:
+ if exact_match:
+ if (
+ search_term == channel_name_lower
+ or search_term == channel_id_lower
+ ):
+ is_match = True
+ break
+ else:
+ if (
+ search_term in channel_name_lower
+ or search_term in channel_id_lower
+ ):
+ is_match = True
+ break
+
+ if is_match:
+ channel = channel_schema.load(channel_data)
+ matches.append(channel)
+
+ if len(matches) >= limit:
+ break
- try:
- while True:
- page_count += 1
+ slack_cursor = response.data.get("response_metadata",
{}).get("next_cursor")
+ if not slack_cursor:
+ break
- response = client.conversations_list(
- limit=999, cursor=cursor, exclude_archived=True, **extra_params
- )
- page_channels = response.data["channels"]
- channels.extend(channel_schema.load(channel) for channel in
page_channels)
-
- logger.debug(
- "Fetched page %d: %d channels (total: %d)",
- page_count,
- len(page_channels),
- len(channels),
- )
+ has_more = bool(slack_cursor) and len(matches) >= limit
+
+ return {
+ "result": matches[:limit],
+ "next_cursor": slack_cursor if has_more else None,
+ "has_more": has_more,
+ }
- cursor = response.data.get("response_metadata",
{}).get("next_cursor")
- if not cursor:
- break
+def _fetch_from_cache( # noqa: C901
+ cached_channels: list[SlackChannelSchema],
+ search_string: str,
+ types: Optional[list[SlackChannelTypes]],
+ exact_match: bool,
+ cursor: Optional[str],
+ limit: int,
+) -> Optional[dict[str, Any]]:
+ """
+ Fetch channels from cache with in-memory filtering and pagination.
+
+ This is the fast path - operates entirely in-memory on cached data.
+ Used when cache is available and warm.
+
+ Args:
+ cached_channels: Complete list of all cached channels
+ search_string: Search term(s) for filtering
+ types: Channel types to filter
+ exact_match: If True, search term must exactly match
+ cursor: Cache pagination cursor (format: "cache:N")
+ limit: Maximum channels to return
+
+ Returns:
+ Dict with filtered and paginated results, or None if pagination
+ exceeds cached data (signals need to fall back to API)
+ """
+ # Start with all cached channels
+ channels = list(cached_channels)
+
+ # Filter by channel types if specified
+ if types and len(types) < len(SlackChannelTypes):
+ type_set = set(types)
+ channels = [
+ ch
+ for ch in channels
+ if (
+ (SlackChannelTypes.PRIVATE in type_set and
ch.get("is_private"))
+ or (SlackChannelTypes.PUBLIC in type_set and not
ch.get("is_private"))
+ )
+ ]
+
+ # Filter by search string with comma-separated OR logic
+ if search_string:
+ search_terms = [
+ term.strip().lower() for term in search_string.split(",") if
term.strip()
+ ]
+ filtered = []
+
+ for ch in channels:
+ channel_name_lower = ch.get("name", "").lower()
+ channel_id_lower = ch.get("id", "").lower()
+ is_match = False
+
+ for search_term in search_terms:
+ if exact_match:
+ if (
+ search_term == channel_name_lower
+ or search_term == channel_id_lower
+ ):
+ is_match = True
+ break
+ else:
+ if (
+ search_term in channel_name_lower
+ or search_term in channel_id_lower
+ ):
+ is_match = True
+ break
+
+ if is_match:
+ filtered.append(ch)
+
+ channels = filtered
+
+ # Calculate pagination offset from cursor
+ offset = 0
+ if cursor and cursor.startswith("cache:"):
+ try:
+ offset = int(cursor.split(":", 1)[1])
+ except (ValueError, IndexError):
+ offset = 0
+
+ # Check if we're trying to paginate beyond cached data
+ if offset >= len(channels):
logger.info(
- "Successfully fetched %d Slack channels in %d pages",
+ "Pagination offset (%d) exceeds cached data (%d channels). "
+ "Falling back to API.",
+ offset,
len(channels),
- page_count,
- )
- return channels
- except SlackApiError as ex:
- logger.error(
- "Failed to fetch Slack channels after %d pages: %s",
- page_count,
- str(ex),
- exc_info=True,
)
- raise
+ return None
+
+ # Paginate in memory
+ page = channels[offset : offset + limit]
+ next_offset = offset + limit
+ has_more = next_offset < len(channels)
+
+ # Check if we have a continuation cursor (cache was truncated)
+ continuation_cursor = cache_manager.cache.get(
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY
+ )
+
+ # If we've reached the end of cached data but there's a continuation
cursor,
+ # signal that more data exists via API
+ next_cursor: Optional[str]
+ if not has_more and continuation_cursor:
+ # Return special cursor that signals transition to API
+ next_cursor = "api:continue"
Review Comment:
Same here: is this still being used? I think this was related with the
`SLACK_CHANNELS_CONTINUATION_CURSOR_KEY` usage.
##########
superset/reports/api.py:
##########
@@ -577,14 +577,17 @@ def slack_channels(self, **kwargs: Any) -> Response:
search_string = params.get("search_string")
types = params.get("types", [])
exact_match = params.get("exact_match", False)
- force = params.get("force", False)
- channels = get_channels_with_search(
+ cursor = params.get("cursor")
+ limit = params.get("limit", 100)
Review Comment:
oh also, it seems your recent code changes removed usage for
`SLACK_CHANNELS_CONTINUATION_CURSOR_KEY`. In that case, do we still need to
expose `cursor` here?
##########
superset/tasks/slack.py:
##########
@@ -15,17 +15,45 @@
# specific language governing permissions and limitations
# under the License.
import logging
+from typing import Optional
from flask import current_app
-from superset.extensions import celery_app
-from superset.utils.slack import get_channels
+from superset.extensions import cache_manager, celery_app
+from superset.utils.slack import (
+ get_channels_with_search,
+ SLACK_CHANNELS_CACHE_KEY,
+ SlackChannelTypes,
+)
logger = logging.getLogger(__name__)
+CACHE_WARMUP_TIME_LIMIT = 300 # 5 minutes
Review Comment:
One thing I thought about is if we could have this constant to receive a
value from a constant from `config.py`? This way users could set a custom value
in there that would get assigned here (considering that this local constant
works).
Do you know if that would work? I'm just asking because I imagine there are
some larger organizations that might need more than 5 minutes. Having the
ability to customize from there would be better.
##########
superset/utils/slack.py:
##########
@@ -28,12 +28,15 @@
from superset.exceptions import SupersetException
from superset.extensions import cache_manager
from superset.reports.schemas import SlackChannelSchema
-from superset.utils import cache as cache_util
from superset.utils.backports import StrEnum
-from superset.utils.core import recipients_string_to_list
logger = logging.getLogger(__name__)
+SLACK_CHANNELS_CACHE_KEY = "slack_conversations_list"
+SLACK_CHANNELS_CONTINUATION_CURSOR_KEY = (
+ f"{SLACK_CHANNELS_CACHE_KEY}_continuation_cursor"
+)
Review Comment:
Is this still being used? I don't see a `set` event for this key anymore
##########
superset/reports/api.py:
##########
@@ -577,14 +582,31 @@ def slack_channels(self, **kwargs: Any) -> Response:
search_string = params.get("search_string")
types = params.get("types", [])
exact_match = params.get("exact_match", False)
+ cursor = params.get("cursor")
+ limit = params.get("limit", 100)
force = params.get("force", False)
- channels = get_channels_with_search(
+
+ # Clear cache if force refresh requested
+ if force:
+ cache_manager.cache.delete(SLACK_CHANNELS_CACHE_KEY)
+ cache_manager.cache.delete(
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY
+ )
+ logger.info("Slack channels cache cleared due to force=True")
+
+ # Trigger async cache warmup if caching is enabled
+ if current_app.config.get("SLACK_ENABLE_CACHING", True):
+ cache_channels.delay()
+ logger.info("Triggered async cache warmup task")
+
+ channels_data = get_channels_with_search(
search_string=search_string,
types=types,
exact_match=exact_match,
- force=force,
+ cursor=cursor,
+ limit=limit,
)
Review Comment:
This might be an issue. When `force is True`, here we're calling
`cache_channels.delay()` to warm up the cache + also calling
`get_channels_with_search()`. This will hit the Slack APIs in parallel (one
thread for async cache warm up and another for the frontend search) which will
likely cause a lot more rate limits.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]