Vitor-Avila commented on code in PR #35832:
URL: https://github.com/apache/superset/pull/35832#discussion_r2536162802
##########
superset/utils/slack.py:
##########
@@ -59,80 +62,277 @@ def get_slack_client() -> WebClient:
return client
-@cache_util.memoized_func(
- key="slack_conversations_list",
- cache=cache_manager.cache,
-)
-def get_channels() -> list[SlackChannelSchema]:
- """
- Retrieves a list of all conversations accessible by the bot
- from the Slack API, and caches results (to avoid rate limits).
-
- The Slack API does not provide search so to apply a search use
- get_channels_with_search instead.
- """
- client = get_slack_client()
- channel_schema = SlackChannelSchema()
+def _fetch_channels_without_search(
+ client: WebClient,
+ channel_schema: SlackChannelSchema,
+ types_param: str,
+ cursor: Optional[str],
+ limit: int,
+) -> dict[str, Any]:
+ """Fetch channels without search filtering, paginating for large limits."""
channels: list[SlackChannelSchema] = []
- extra_params = {"types": ",".join(SlackChannelTypes)}
- cursor = None
- page_count = 0
+ slack_cursor = cursor
+ page_size = min(limit, 1000)
+
+ while True:
+ response = client.conversations_list(
+ limit=page_size,
+ cursor=slack_cursor,
+ exclude_archived=True,
+ types=types_param,
+ )
+
+ page_channels = [
+ channel_schema.load(channel) for channel in
response.data["channels"]
+ ]
+ channels.extend(page_channels)
+
+ slack_cursor = response.data.get("response_metadata",
{}).get("next_cursor")
+
+ if not slack_cursor or len(page_channels) < page_size or len(channels)
>= limit:
+ break
+
+ return {
+ "result": channels[:limit],
+ "next_cursor": slack_cursor,
+ "has_more": bool(slack_cursor),
+ }
+
+
+def _fetch_channels_with_search(
+ client: WebClient,
+ channel_schema: SlackChannelSchema,
+ types_param: str,
+ search_string: str,
+ exact_match: bool,
+ cursor: Optional[str],
+ limit: int,
+) -> dict[str, Any]:
+ """Fetch channels with search filtering, streaming through pages."""
+ matches: list[SlackChannelSchema] = []
+ slack_cursor = cursor
+ search_terms = [
+ term.strip().lower() for term in search_string.split(",") if
term.strip()
+ ]
+
+ while len(matches) < limit:
+ response = client.conversations_list(
+ limit=1000,
Review Comment:
We should continue using `999` as the limit -- from [Slack
docs](https://docs.slack.dev/reference/methods/conversations.list/#arguments):
> The maximum number of items to return. Fewer than the requested number of
items may be returned, even if the end of the list hasn't been reached. Must be
an integer under 1000.
##########
superset/tasks/slack.py:
##########
@@ -15,29 +15,128 @@
# specific language governing permissions and limitations
# under the License.
import logging
+from typing import Optional
from flask import current_app
-from superset.extensions import celery_app
-from superset.utils.slack import get_channels
+from superset.extensions import cache_manager, celery_app
+from superset.utils.slack import (
+ get_channels_with_search,
+ SLACK_CHANNELS_CACHE_KEY,
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY,
+ SlackChannelTypes,
+)
logger = logging.getLogger(__name__)
-@celery_app.task(name="slack.cache_channels")
+@celery_app.task(
+ name="slack.cache_channels",
+ time_limit=300, # 5 minute hard timeout (via SLACK_CACHE_WARMUP_TIMEOUT)
+ soft_time_limit=240, # 4 minute warning
Review Comment:
Do we want to make this a % out of `SLACK_CACHE_WARMUP_TIMEOUT`'s value?
##########
superset/tasks/slack.py:
##########
@@ -15,29 +15,128 @@
# specific language governing permissions and limitations
# under the License.
import logging
+from typing import Optional
from flask import current_app
-from superset.extensions import celery_app
-from superset.utils.slack import get_channels
+from superset.extensions import cache_manager, celery_app
+from superset.utils.slack import (
+ get_channels_with_search,
+ SLACK_CHANNELS_CACHE_KEY,
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY,
+ SlackChannelTypes,
+)
logger = logging.getLogger(__name__)
-@celery_app.task(name="slack.cache_channels")
+@celery_app.task(
+ name="slack.cache_channels",
+ time_limit=300, # 5 minute hard timeout (via SLACK_CACHE_WARMUP_TIMEOUT)
+ soft_time_limit=240, # 4 minute warning
+)
def cache_channels() -> None:
+ """
+ Celery task to warm up the Slack channels cache.
+
+ This task fetches all Slack channels using pagination and stores them in
cache.
+ Includes safeguards for very large workspaces (50k+ channels).
+
+ Respects the following config variables:
+ - SLACK_ENABLE_CACHING: If False, this task does nothing
+ - SLACK_CACHE_MAX_CHANNELS: Maximum channels to cache (prevents runaway
fetches)
+ - SLACK_CACHE_WARMUP_TIMEOUT: Task timeout in seconds
+ - SLACK_CACHE_TIMEOUT: How long to keep cached data
+ """
+ enable_caching = current_app.config.get("SLACK_ENABLE_CACHING", True)
+ if not enable_caching:
+ logger.info(
+ "Slack caching disabled (SLACK_ENABLE_CACHING=False), skipping
cache warmup"
+ )
+ return
+
cache_timeout = current_app.config["SLACK_CACHE_TIMEOUT"]
+ max_channels = current_app.config.get("SLACK_CACHE_MAX_CHANNELS", 20000)
retry_count = current_app.config.get("SLACK_API_RATE_LIMIT_RETRY_COUNT", 2)
logger.info(
"Starting Slack channels cache warm-up task "
- "(cache_timeout=%ds, retry_count=%d)",
+ "(max_channels=%d, cache_timeout=%ds, retry_count=%d)",
+ max_channels,
cache_timeout,
retry_count,
)
try:
- get_channels(force=True, cache_timeout=cache_timeout)
+ all_channels = []
+ cursor: Optional[str] = None
+ page_count = 0
+
+ while True:
+ page_count += 1
+
+ result = get_channels_with_search(
+ search_string="",
+ types=list(SlackChannelTypes),
+ cursor=cursor,
+ limit=1000,
+ )
+
+ page_channels = result["result"]
+ all_channels.extend(page_channels)
+
+ logger.debug(
+ "Fetched page %d: %d channels (total: %d)",
+ page_count,
+ len(page_channels),
+ len(all_channels),
+ )
+
+ cursor = result.get("next_cursor")
+
+ # Safety check: stop if we hit the max channel limit
+ if len(all_channels) >= max_channels:
+ # Store the continuation cursor so we can resume via API later
+ if cursor:
+ cache_manager.cache.set(
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY,
+ cursor,
+ timeout=cache_timeout,
+ )
+ logger.warning(
+ "Reached max channel limit (%d channels in %d pages). "
+ "Stored continuation cursor for API fallback. "
+ "Channels beyond limit will be fetched from API on
demand.",
+ len(all_channels),
+ page_count,
+ )
+ else:
+ logger.warning(
+ "Reached max channel limit (%d channels in %d pages). "
+ "No more channels available.",
+ len(all_channels),
+ page_count,
+ )
Review Comment:
If we don't have a `cursor`, does it mean we got it all? If so, do we need a
`warning` here?
##########
superset/reports/api.py:
##########
@@ -577,14 +577,17 @@ def slack_channels(self, **kwargs: Any) -> Response:
search_string = params.get("search_string")
types = params.get("types", [])
exact_match = params.get("exact_match", False)
- force = params.get("force", False)
- channels = get_channels_with_search(
+ cursor = params.get("cursor")
+ limit = params.get("limit", 100)
Review Comment:
for `limit` specifically, is the idea that you might want to limit how many
options are visible in the dropdown?
##########
superset/reports/schemas.py:
##########
@@ -58,6 +58,9 @@
"items": {"type": "string", "enum": ["public_channel",
"private_channel"]},
},
"exact_match": {"type": "boolean"},
+ "cursor": {"type": ["string", "null"]},
+ "limit": {"type": "integer", "default": 100, "minimum": 1, "maximum":
1000},
+ "force": {"type": "boolean"},
Review Comment:
If we're indeed removing `force` (see previous comment) we should not add it
here.
##########
superset/utils/slack.py:
##########
@@ -59,80 +62,277 @@ def get_slack_client() -> WebClient:
return client
-@cache_util.memoized_func(
- key="slack_conversations_list",
- cache=cache_manager.cache,
-)
-def get_channels() -> list[SlackChannelSchema]:
- """
- Retrieves a list of all conversations accessible by the bot
- from the Slack API, and caches results (to avoid rate limits).
-
- The Slack API does not provide search so to apply a search use
- get_channels_with_search instead.
- """
- client = get_slack_client()
- channel_schema = SlackChannelSchema()
+def _fetch_channels_without_search(
+ client: WebClient,
+ channel_schema: SlackChannelSchema,
+ types_param: str,
+ cursor: Optional[str],
+ limit: int,
+) -> dict[str, Any]:
+ """Fetch channels without search filtering, paginating for large limits."""
channels: list[SlackChannelSchema] = []
- extra_params = {"types": ",".join(SlackChannelTypes)}
- cursor = None
- page_count = 0
+ slack_cursor = cursor
+ page_size = min(limit, 1000)
Review Comment:
The flow seems to be: `get_channels_with_search->_fetch_from_api ->
_fetch_channels_without_search`. Is this correct? I couldn't find any call to
`_fetch_channels_without_search` without going through
`get_channels_with_search`. Also, `get_channels_with_search` is called from the
API with `limit` set to `100`, which will bubble down to this. Is it expected
that `_fetch_channels_without_search` will only return 100 results by default?
##########
superset/reports/schemas.py:
##########
@@ -58,6 +58,9 @@
"items": {"type": "string", "enum": ["public_channel",
"private_channel"]},
},
"exact_match": {"type": "boolean"},
+ "cursor": {"type": ["string", "null"]},
+ "limit": {"type": "integer", "default": 100, "minimum": 1, "maximum":
1000},
+ "force": {"type": "boolean"},
Review Comment:
We can also update `maximum` here to `999`
##########
superset/reports/api.py:
##########
@@ -577,14 +577,17 @@ def slack_channels(self, **kwargs: Any) -> Response:
search_string = params.get("search_string")
types = params.get("types", [])
exact_match = params.get("exact_match", False)
- force = params.get("force", False)
- channels = get_channels_with_search(
+ cursor = params.get("cursor")
+ limit = params.get("limit", 100)
Review Comment:
Also, should we preserve `force` in case the user want to trigger a cache
refresh?
##########
superset/reports/api.py:
##########
@@ -577,14 +577,17 @@ def slack_channels(self, **kwargs: Any) -> Response:
search_string = params.get("search_string")
types = params.get("types", [])
exact_match = params.get("exact_match", False)
- force = params.get("force", False)
- channels = get_channels_with_search(
+ cursor = params.get("cursor")
+ limit = params.get("limit", 100)
Review Comment:
Do we need to expose `cursor` and `limit` to the API? Is there a scenario in
which a user would want to pass these explicitly?
If so, do we also want to set `limit`'s default value to `999` here, as
opposed to `100`?
##########
superset/tasks/slack.py:
##########
@@ -15,29 +15,128 @@
# specific language governing permissions and limitations
# under the License.
import logging
+from typing import Optional
from flask import current_app
-from superset.extensions import celery_app
-from superset.utils.slack import get_channels
+from superset.extensions import cache_manager, celery_app
+from superset.utils.slack import (
+ get_channels_with_search,
+ SLACK_CHANNELS_CACHE_KEY,
+ SLACK_CHANNELS_CONTINUATION_CURSOR_KEY,
+ SlackChannelTypes,
+)
logger = logging.getLogger(__name__)
-@celery_app.task(name="slack.cache_channels")
+@celery_app.task(
+ name="slack.cache_channels",
+ time_limit=300, # 5 minute hard timeout (via SLACK_CACHE_WARMUP_TIMEOUT)
Review Comment:
Did you mean to use `SLACK_CACHE_WARMUP_TIMEOUT` here as opposed to hardcode
`300`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]