michael-s-molina commented on code in PR #36368: URL: https://github.com/apache/superset/pull/36368#discussion_r2672655508
########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): + return + + # Process and cache the data + process_and_cache(data) +``` + +### Execute Tasks Asynchronously or Synchronously + +The `@async_task` decorator enables flexible execution modes: + +```python +# Asynchronous execution via Celery (for production workloads) +task = long_running_task.schedule() +# Task runs in background worker, returns immediately +print(task.status) # "pending" + +# Synchronous execution (for testing or when blocking is acceptable) +task = long_running_task() +# Task executes inline, blocks until complete +print(task.status) # "success" +``` + +**When to use each mode:** +- **Async (`.schedule()`)**: Production workloads, long-running operations, non-blocking execution +- **Sync (direct call)**: Unit testing, development, or lightweight operations + +## Core Concepts + +### Ambient Context + +Tasks access execution context via `get_context()`: + +```python +@async_task() +def my_task(business_arg: int) -> None: + ctx = get_context() # Ambient context access + task = ctx.task # Task entity + user = ctx.user # User who dispatched task + + task.set_payload({"arg": business_arg}) + ctx.update_task(task) +``` + +### Task Lifecycle Review Comment: There's no built-in support for automatic retries on transient failures. Celery has retry support, but GATF doesn't expose or integrate with it. ########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): + return + + # Process and cache the data + process_and_cache(data) +``` + +### Execute Tasks Asynchronously or Synchronously + +The `@async_task` decorator enables flexible execution modes: + +```python +# Asynchronous execution via Celery (for production workloads) +task = long_running_task.schedule() +# Task runs in background worker, returns immediately +print(task.status) # "pending" + +# Synchronous execution (for testing or when blocking is acceptable) +task = long_running_task() +# Task executes inline, blocks until complete +print(task.status) # "success" +``` + +**When to use each mode:** +- **Async (`.schedule()`)**: Production workloads, long-running operations, non-blocking execution +- **Sync (direct call)**: Unit testing, development, or lightweight operations + +## Core Concepts + +### Ambient Context + +Tasks access execution context via `get_context()`: + +```python +@async_task() Review Comment: Timeouts depend on Celery's `soft_time_limit`. The framework could provide a cleaner interface: ```python @async_task(timeout=300) def my_task(): ... ``` ########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): + return + + # Process and cache the data + process_and_cache(data) +``` + +### Execute Tasks Asynchronously or Synchronously + +The `@async_task` decorator enables flexible execution modes: + +```python +# Asynchronous execution via Celery (for production workloads) +task = long_running_task.schedule() +# Task runs in background worker, returns immediately +print(task.status) # "pending" + +# Synchronous execution (for testing or when blocking is acceptable) +task = long_running_task() +# Task executes inline, blocks until complete +print(task.status) # "success" +``` + +**When to use each mode:** +- **Async (`.schedule()`)**: Production workloads, long-running operations, non-blocking execution +- **Sync (direct call)**: Unit testing, development, or lightweight operations + +## Core Concepts + +### Ambient Context + +Tasks access execution context via `get_context()`: + +```python +@async_task() +def my_task(business_arg: int) -> None: + ctx = get_context() # Ambient context access + task = ctx.task # Task entity + user = ctx.user # User who dispatched task + + task.set_payload({"arg": business_arg}) + ctx.update_task(task) +``` + +### Task Lifecycle + +1. **PENDING**: Task created, awaiting execution +2. **IN_PROGRESS**: Currently executing +3. **SUCCESS**: Completed successfully +4. **FAILURE**: Failed with error +5. **CANCELLED**: Cancelled before/during execution + +### Deduplication + +By default, each task gets a random UUID. Use `idempotency_key` for explicit deduplication: + +```python +# Without deduplication - creates new task each time +task1 = my_task.schedule(arg=1) +task2 = my_task.schedule(arg=1) # Separate task + +# With deduplication - returns existing if active +task1 = my_task.schedule(arg=1, options=TaskOptions(idempotency_key="key")) Review Comment: What happens if the same `idempotency_key` is used but with different arguments? Should it return the existing task or raise an error? This edge case needs clarification. ########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): + return + + # Process and cache the data + process_and_cache(data) +``` + +### Execute Tasks Asynchronously or Synchronously + +The `@async_task` decorator enables flexible execution modes: + +```python +# Asynchronous execution via Celery (for production workloads) +task = long_running_task.schedule() +# Task runs in background worker, returns immediately +print(task.status) # "pending" + +# Synchronous execution (for testing or when blocking is acceptable) +task = long_running_task() +# Task executes inline, blocks until complete +print(task.status) # "success" +``` + +**When to use each mode:** Review Comment: The current design exposes execution mode in the calling code (`task()` vs `task.schedule()`), which means tests must use different code paths than production. A better approach would make `task.schedule()` the only interface, with execution mode controlled by configuration (e.g., `CELERY_TASK_ALWAYS_EAGER` or similar), ensuring tests exercise the same code path as production. ########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): + return + + # Process and cache the data + process_and_cache(data) +``` + +### Execute Tasks Asynchronously or Synchronously + +The `@async_task` decorator enables flexible execution modes: + +```python +# Asynchronous execution via Celery (for production workloads) +task = long_running_task.schedule() +# Task runs in background worker, returns immediately +print(task.status) # "pending" + +# Synchronous execution (for testing or when blocking is acceptable) +task = long_running_task() +# Task executes inline, blocks until complete +print(task.status) # "success" +``` + +**When to use each mode:** +- **Async (`.schedule()`)**: Production workloads, long-running operations, non-blocking execution +- **Sync (direct call)**: Unit testing, development, or lightweight operations + +## Core Concepts + +### Ambient Context + +Tasks access execution context via `get_context()`: + +```python +@async_task() +def my_task(business_arg: int) -> None: + ctx = get_context() # Ambient context access + task = ctx.task # Task entity + user = ctx.user # User who dispatched task + + task.set_payload({"arg": business_arg}) + ctx.update_task(task) +``` + +### Task Lifecycle + +1. **PENDING**: Task created, awaiting execution +2. **IN_PROGRESS**: Currently executing +3. **SUCCESS**: Completed successfully +4. **FAILURE**: Failed with error +5. **CANCELLED**: Cancelled before/during execution + +### Deduplication + +By default, each task gets a random UUID. Use `idempotency_key` for explicit deduplication: + +```python +# Without deduplication - creates new task each time +task1 = my_task.schedule(arg=1) +task2 = my_task.schedule(arg=1) # Separate task + +# With deduplication - returns existing if active +task1 = my_task.schedule(arg=1, options=TaskOptions(idempotency_key="key")) +task2 = my_task.schedule(arg=1, options=TaskOptions(idempotency_key="key")) +# task2 is the same as task1 if task1 is PENDING or IN_PROGRESS +``` + +## Cancellation Support + +The framework provides built-in cancellation support with minimal boilerplate. + +### Cleanup Handlers Review Comment: There's no mechanism to clean up old completed/failed tasks from the database. The `async_tasks` table could grow indefinitely. ########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): Review Comment: The current cancellation mechanism relies on polling, where tasks check `ctx.is_cancelled()` only after completing each step. This creates a delay problem: if a step involves a long-running operation (like a database query), the task must wait for that operation to finish before detecting the cancellation signal. Since some operations provide their own cancellation APIs (e.g., `cursor.cancel()` for database queries), an event-driven approach would be more effective. By registering callbacks that trigger immediately upon cancellation, tasks could interrupt in-progress operations rather than waiting for them to complete naturally. Implementing event-based cancellation would require either a background polling thread (checking the database or Redis flags periodically), Redis Pub/Sub (for real-time push notifications across workers), or Celery's broadcast control mechanism (to send cancellation signals directly to worker processes). ########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): + return + + # Process and cache the data + process_and_cache(data) +``` + +### Execute Tasks Asynchronously or Synchronously + +The `@async_task` decorator enables flexible execution modes: + +```python +# Asynchronous execution via Celery (for production workloads) +task = long_running_task.schedule() +# Task runs in background worker, returns immediately +print(task.status) # "pending" + +# Synchronous execution (for testing or when blocking is acceptable) +task = long_running_task() +# Task executes inline, blocks until complete +print(task.status) # "success" +``` + +**When to use each mode:** +- **Async (`.schedule()`)**: Production workloads, long-running operations, non-blocking execution +- **Sync (direct call)**: Unit testing, development, or lightweight operations + +## Core Concepts + +### Ambient Context + +Tasks access execution context via `get_context()`: + +```python +@async_task() +def my_task(business_arg: int) -> None: Review Comment: It would be interesting to support: - Task prioritization (prioritize urgent tasks over background work) - Task dependencies (run task B after task A completes) ########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): + return + + # Process and cache the data + process_and_cache(data) +``` + +### Execute Tasks Asynchronously or Synchronously + +The `@async_task` decorator enables flexible execution modes: + +```python +# Asynchronous execution via Celery (for production workloads) +task = long_running_task.schedule() +# Task runs in background worker, returns immediately +print(task.status) # "pending" + +# Synchronous execution (for testing or when blocking is acceptable) +task = long_running_task() +# Task executes inline, blocks until complete +print(task.status) # "success" +``` + +**When to use each mode:** +- **Async (`.schedule()`)**: Production workloads, long-running operations, non-blocking execution +- **Sync (direct call)**: Unit testing, development, or lightweight operations + +## Core Concepts + +### Ambient Context + +Tasks access execution context via `get_context()`: + +```python +@async_task() +def my_task(business_arg: int) -> None: + ctx = get_context() # Ambient context access + task = ctx.task # Task entity + user = ctx.user # User who dispatched task + + task.set_payload({"arg": business_arg}) + ctx.update_task(task) +``` + +### Task Lifecycle + +1. **PENDING**: Task created, awaiting execution +2. **IN_PROGRESS**: Currently executing +3. **SUCCESS**: Completed successfully +4. **FAILURE**: Failed with error +5. **CANCELLED**: Cancelled before/during execution + +### Deduplication + +By default, each task gets a random UUID. Use `idempotency_key` for explicit deduplication: + +```python +# Without deduplication - creates new task each time +task1 = my_task.schedule(arg=1) +task2 = my_task.schedule(arg=1) # Separate task + +# With deduplication - returns existing if active +task1 = my_task.schedule(arg=1, options=TaskOptions(idempotency_key="key")) +task2 = my_task.schedule(arg=1, options=TaskOptions(idempotency_key="key")) +# task2 is the same as task1 if task1 is PENDING or IN_PROGRESS +``` + +## Cancellation Support + +The framework provides built-in cancellation support with minimal boilerplate. + +### Cleanup Handlers + +Register cleanup functions that run automatically when a task ends (success, failure, or cancellation): + +```python +@async_task() +def my_task() -> None: + ctx = get_context() + + @ctx.on_cleanup + def cleanup(): + """Runs automatically when task ends""" + logger.info("Task completed") +``` + +**Multiple cleanup handlers** (execute in LIFO order): + +```python [email protected]_cleanup +def cleanup_cache(): + cache.clear() + [email protected]_cleanup +def cleanup_log(): + logger.info("Done") +``` + +### Automatic Pre-Execution Check + +**The framework automatically checks if a task was cancelled before execution starts.** You don't need an initial `if ctx.is_cancelled()` check - just start working! + +### Checking During Execution + +Check for cancellation at key points during execution: + +```python +@async_task() +def process_items(items: list[int]) -> None: + ctx = get_context() + + @ctx.on_cleanup + def cleanup(): + logger.info("Processing ended") + + # No initial check needed - framework handles it! + + for i, item in enumerate(items): + # Check every 10 items + if i % 10 == 0 and ctx.is_cancelled(): + return + + process_single_item(item) +``` + +### Using Helper Methods + +**`ctx.run()` - Pre-check wrapper (optional):** + +```python +@async_task() +def fetch_and_process(api_url: str) -> None: + ctx = get_context() + + @ctx.on_cleanup + def cleanup(): + logger.info("Fetch completed") + + # Helper checks cancellation before executing + response = ctx.run(lambda: requests.get(api_url, timeout=60)) + if response is None: + return # Cancelled + + data = ctx.run(lambda: response.json()) + if data is None: + return + + cache.set("data", data) +``` + +**`ctx.update_progress()` - Progress tracking with cancellation check:** + +```python +@async_task() +def process_batch(item_ids: list[int]) -> None: + ctx = get_context() + + for i, item_id in enumerate(item_ids): + # Combined progress update + cancellation check + if not ctx.update_progress(i + 1, len(item_ids)): + return # Cancelled + + process_single_item(item_id) +``` + +## Advanced Usage + +### Complete Example: API Fetch with Cleanup + +```python +@async_task() +def fetch_and_cache(api_url: str, chart_id: int) -> None: + """Fetch from external API and cache results.""" + ctx = get_context() + cache_key = f"chart_{chart_id}_data" + + @ctx.on_cleanup + def cleanup(): + if ctx.is_cancelled(): + # Clear partial cache on cancellation + cache.delete(cache_key) + logger.info(f"Fetch cancelled, cleared cache: {cache_key}") + else: + logger.info(f"Fetch completed: {cache_key}") + + # Fetch with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before expensive processing + if ctx.is_cancelled(): + return + + processed = process_data(data) + cache.set(cache_key, processed) +``` + +### Progressive Updates + +Update progress and check cancellation simultaneously: + +```python +@async_task() +def multi_step_task(item_ids: list[int]) -> None: + ctx = get_context() + + @ctx.on_cleanup + def cleanup(): + logger.info(f"Processed {ctx.task.get_payload().get('count', 0)} items") + + for i, item_id in enumerate(item_ids): + # Update progress and check cancellation + if not ctx.update_progress( + i + 1, + len(item_ids), + current_item=item_id + ): + return # Cancelled + + process_item(item_id) + + ctx.task.set_payload({"count": len(item_ids)}) Review Comment: In GATF, task functions can return values, but they're completely ignored: ```python @async_task() def calculate_metrics(dataset_id: int) -> dict: # Perform expensive calculations... return {"total_revenue": 1000000, "avg_order": 250.50} # This return value is lost! ``` When you schedule this task and later check its status, there's no way to retrieve the result: ```python task = calculate_metrics.schedule(dataset_id=123) # ... wait for completion ... # task.status = "SUCCESS" # But how do I get the calculated metrics? ``` ### Manual Workaround: Using Payload Currently, developers must manually store results in the payload: ```python @async_task() def calculate_metrics(dataset_id: int) -> None: # Must return None ctx = get_context() # Do calculations result = {"total_revenue": 1000000, "avg_order": 250.50} # Manually store result task = ctx.task task.set_payload({"result": result}) ctx.update_task(task) # No return statement ``` Then retrieve it: ```python task = calculate_metrics.schedule(dataset_id=123) # ... wait ... result = task.get_payload()["result"] # Manual retrieval ``` ### Problems with This Approach 1. **Inconsistent** - Each developer implements result storage differently 2. **Verbose** - Requires boilerplate in every task 3. **No type safety** - Payload is just `dict[str, Any]` 4. **Confusion** - Function signature suggests it returns a value, but it doesn't ### What Other Frameworks Do **Celery's AsyncResult**: ```python @celery_app.task def calculate_metrics(dataset_id): return {"total_revenue": 1000000} # Automatically stored result = calculate_metrics.delay(123) value = result.get() # Retrieves the return value ``` **Django Q**: ```python task_id = async_task('calculate_metrics', 123) task = Task.objects.get(id=task_id) result = task.result # Stored automatically ``` ### What GATF Should Have ```python @async_task() def calculate_metrics(dataset_id: int) -> dict: return {"total_revenue": 1000000} # Framework stores this task = calculate_metrics.schedule(dataset_id=123) # ... wait ... result = task.get_result() # Framework-provided method # or result = task.result # Property ``` The framework should: - Automatically capture return values - Store them in the database (maybe a new `result` column or in `payload`) - Provide a clean API to retrieve them - Handle serialization/deserialization -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
