codeant-ai-for-open-source[bot] commented on code in PR #38018:
URL: https://github.com/apache/superset/pull/38018#discussion_r2813720218
##########
docker/pythonpath_dev/superset_config.py:
##########
@@ -136,3 +136,66 @@ class CeleryConfig:
)
except ImportError:
logger.info("Using default Docker config...")
+
+
+# MonexpertAPI Scheduler Integration
+# MonexpertAPI Scheduler Integration
+from superset.initialization import SupersetAppInitializer
+
+class CustomAppInitializer(SupersetAppInitializer):
+ """Custom initializer to add API Scheduler after AppBuilder menu is
ready"""
+
+ def init_views(self):
+ """Called after AppBuilder builds its menu tree"""
+ super().init_views()
+
+ logger.info("Registering API Scheduler Blueprint...")
+
+ try:
+ from superset.views.api_scheduler.views import api_scheduler_bp
+ self.superset_app.register_blueprint(api_scheduler_bp)
+ logger.info("✓ API Scheduler Blueprint registered")
+
+ from superset import appbuilder
+ appbuilder.add_link(
+ name="API Scheduler",
+ href="/api-scheduler/dashboard",
+ icon="fa-clock",
+ category="Settings"
+ )
+ logger.info("✓ API Scheduler menu link added")
+
+ except Exception as e:
+ logger.error(f"Failed to register API Scheduler: {str(e)}")
+ raise
+
+APP_INITIALIZER = CustomAppInitializer
+
+# Enable template processing for Handlebars charts
+ENABLE_TEMPLATE_PROCESSING = True
+# DISABLE HTML sanitization (CSS/JS için gerekli)
+HTML_SANITIZATION = False
+# Extended schema (güvenlik için - sanitization tutmak istersen)
+HTML_SANITIZATION_SCHEMA_EXTENSIONS = {
+ "tags": ["div", "span", "p", "br", "style", "script", "h1", "h2", "h3",
"img", "a"],
+ "attributes": {
+ "*": ["class", "style", "id", "data-*", "onclick"],
+ "a": ["href", "target"],
+ "img": ["src", "alt", "width", "height"],
+ }
+}
+# Content Security Policy - CSS/JS inline'ı izin ver
+TALISMAN_CONFIG = {
+ 'default_src': ("'self'", "'unsafe-inline'"),
+ 'style_src': ("'self'", "'unsafe-inline'"),
+ 'script_src': ("'self'", "'unsafe-inline'"),
+ 'img_src': ("'self'", "data:", "https:"),
+ 'font_src': ("'self'",),
+}
+# Flask-Limiter devre dışı (development)
+RATELIMIT_ENABLED = False
+
+FEATURE_FLAGS = {
+ "DASHBOARD_NATIVE_FILTERS": True,
Review Comment:
**Suggestion:** The second assignment to the feature flags dictionary
overwrites the earlier configuration (e.g. removing `ALERT_REPORTS`), which can
silently disable previously enabled features; instead, you should update/extend
the existing dictionary so prior flags are preserved. [logic error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Alerts and reports feature disabled in dev environment.
- ⚠️ Behavior diverges from intended `ALERT_REPORTS` configuration.
- ⚠️ Future added flags may also be unintentionally overwritten.
```
</details>
##########
docker/pythonpath_dev/superset_config.py:
##########
@@ -136,3 +136,66 @@ class CeleryConfig:
)
except ImportError:
logger.info("Using default Docker config...")
+
+
+# MonexpertAPI Scheduler Integration
+# MonexpertAPI Scheduler Integration
+from superset.initialization import SupersetAppInitializer
+
+class CustomAppInitializer(SupersetAppInitializer):
+ """Custom initializer to add API Scheduler after AppBuilder menu is
ready"""
+
+ def init_views(self):
+ """Called after AppBuilder builds its menu tree"""
+ super().init_views()
+
+ logger.info("Registering API Scheduler Blueprint...")
+
+ try:
+ from superset.views.api_scheduler.views import api_scheduler_bp
+ self.superset_app.register_blueprint(api_scheduler_bp)
+ logger.info("✓ API Scheduler Blueprint registered")
+
+ from superset import appbuilder
+ appbuilder.add_link(
+ name="API Scheduler",
+ href="/api-scheduler/dashboard",
+ icon="fa-clock",
+ category="Settings"
+ )
+ logger.info("✓ API Scheduler menu link added")
+
+ except Exception as e:
+ logger.error(f"Failed to register API Scheduler: {str(e)}")
+ raise
+
+APP_INITIALIZER = CustomAppInitializer
+
+# Enable template processing for Handlebars charts
+ENABLE_TEMPLATE_PROCESSING = True
+# DISABLE HTML sanitization (CSS/JS için gerekli)
+HTML_SANITIZATION = False
+# Extended schema (güvenlik için - sanitization tutmak istersen)
+HTML_SANITIZATION_SCHEMA_EXTENSIONS = {
+ "tags": ["div", "span", "p", "br", "style", "script", "h1", "h2", "h3",
"img", "a"],
+ "attributes": {
+ "*": ["class", "style", "id", "data-*", "onclick"],
+ "a": ["href", "target"],
+ "img": ["src", "alt", "width", "height"],
+ }
+}
+# Content Security Policy - CSS/JS inline'ı izin ver
+TALISMAN_CONFIG = {
+ 'default_src': ("'self'", "'unsafe-inline'"),
+ 'style_src': ("'self'", "'unsafe-inline'"),
+ 'script_src': ("'self'", "'unsafe-inline'"),
+ 'img_src': ("'self'", "data:", "https:"),
Review Comment:
**Suggestion:** Allowing `'unsafe-inline'` in the Content Security Policy
for `script_src` permits any inline JavaScript to run, which largely defeats
CSP protections and makes XSS exploitation much easier; you should remove this
directive for scripts while keeping stricter sources. [security]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ CSP no longer mitigates inline JavaScript injection attacks.
- ⚠️ Any XSS bug is easier to exploit and weaponize.
- ⚠️ Security tooling may overestimate protection due to CSP header.
```
</details>
```suggestion
'script_src': ("'self'",),
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Start Superset with the dev Docker configuration so that
`docker/pythonpath_dev/superset_config.py` is loaded and `TALISMAN_CONFIG`
at lines
189-195 is applied by Flask-Talisman to set the Content-Security-Policy
header.
2. Open the Superset web UI in a browser and inspect the response headers
for any
authenticated page; observe `Content-Security-Policy` includes `script-src
'self'
'unsafe-inline'`.
3. Introduce any inline JavaScript into the rendered HTML (for example, via
the XSS path
in step 3 of suggestion 1, or another template/HTML injection bug) so that
the payload is
rendered as `<script>alert('XSS')</script>` or as an inline `onclick`
handler.
4. Reload the affected page; because `script_src` explicitly allows
`'unsafe-inline'`, the
browser executes the inline JavaScript despite CSP, whereas a stricter
`script_src` would
block the inline script.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** docker/pythonpath_dev/superset_config.py
**Line:** 192:192
**Comment:**
*Security: Allowing `'unsafe-inline'` in the Content Security Policy
for `script_src` permits any inline JavaScript to run, which largely defeats
CSP protections and makes XSS exploitation much easier; you should remove this
directive for scripts while keeping stricter sources.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=7367e9605e5f0c783802ab5f93bfe73b4f304f560b9a0bf4e450cd885d11a994&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=7367e9605e5f0c783802ab5f93bfe73b4f304f560b9a0bf4e450cd885d11a994&reaction=dislike'>👎</a>
##########
superset/templates/api_scheduler/base.html:
##########
@@ -0,0 +1,92 @@
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="UTF-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <title>{% block title %}API Scheduler{% endblock %} - Monexpert</title>
+
+ <!-- Bootstrap 3 CSS -->
+ <link rel="stylesheet"
href="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/css/bootstrap.min.css">
+ <!-- Font Awesome -->
+ <link rel="stylesheet"
href="https://maxcdn.bootstrapcdn.com/font-awesome/4.7.0/css/font-awesome.min.css">
+
+ <style>
+ body {
+ padding-top: 70px;
+ background-color: #f5f5f5;
+ }
+ .navbar-brand {
+ font-weight: bold;
+ }
+ .container-fluid {
Review Comment:
**Suggestion:** Styling the generic `.container-fluid` class changes the
appearance of both the navbar's inner container and the main content container,
causing the fixed top navbar to inherit padding, margins, and box-shadow that
can break its layout; the selector should be narrowed to target only the main
content container. [logic error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Navbar layout altered by extra margin and shadow.
- ⚠️ Fixed-top header appears misaligned relative to page body.
```
</details>
```suggestion
.navbar + .container-fluid {
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Render the template `superset/templates/api_scheduler/base.html` in a
browser; this is
the base layout introduced in this PR.
2. Inspect the DOM: the navbar at line 43 contains a `div` with class
`container-fluid` at
line 44, and the main content wrapper is another `div.container-fluid` at
line 65.
3. Note that the CSS rule at lines 21–27 (`.container-fluid { … margin-top:
20px; }`)
targets all elements with class `container-fluid`, including the navbar's
inner container
and the main content container.
4. In the rendered page, observe that the fixed-top navbar's inner container
now has a
20px top margin, white background, border radius, and box shadow, visually
shifting navbar
content downward and altering the expected Bootstrap navbar appearance.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/templates/api_scheduler/base.html
**Line:** 21:21
**Comment:**
*Logic Error: Styling the generic `.container-fluid` class changes the
appearance of both the navbar's inner container and the main content container,
causing the fixed top navbar to inherit padding, margins, and box-shadow that
can break its layout; the selector should be narrowed to target only the main
content container.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=09ed55879c20105ec0c9a06187bb083447670f9b1751e4be2171bb3461160f66&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=09ed55879c20105ec0c9a06187bb083447670f9b1751e4be2171bb3461160f66&reaction=dislike'>👎</a>
##########
superset/templates/api_scheduler/mappings.html:
##########
@@ -0,0 +1,210 @@
+{% extends "api_scheduler/base.html" %}
+
+{% block title %}Field Mappings - {{ config.name }}{% endblock %}
+
+{% block content %}
+<!-- CSRF Token for JavaScript -->
+<meta name="csrf-token" content="{{ csrf_token() }}">
+
+<div class="container-fluid">
+ <div class="row">
+ <div class="col-md-12">
+ <h2>Field Mappings: {{ config.name }}</h2>
+ <p class="text-muted">API URL: {{ config.api_url }}</p>
+
+ <div class="panel panel-default">
+ <div class="panel-heading">
+ <div class="row">
+ <div class="col-md-6">
+ <h3 class="panel-title">Configure Field
Mappings</h3>
+ </div>
+ <div class="col-md-6 text-right">
+ <button type="button" class="btn btn-sm btn-info"
onclick="testAPI()">
+ <i class="fa fa-flask"></i> Test API
+ </button>
+ </div>
+ </div>
+ </div>
+ <div class="panel-body">
+ <div id="api-test-result" class="alert"
style="display:none;"></div>
+
+ <form method="POST" action="{{
url_for('api_scheduler.edit_mappings', config_id=config.id) }}"
id="mappings-form">
+ <input type="hidden" name="csrf_token" value="{{
csrf_token() }}"/>
+ <table class="table table-bordered">
+ <thead>
+ <tr>
+ <th width="35%">API Field Path</th>
+ <th width="30%">Database Column Name</th>
+ <th width="25%">Data Type</th>
+ <th width="10%">Action</th>
+ </tr>
+ </thead>
+ <tbody id="mappings-tbody">
+ {% if config.field_mappings %}
+ {% for mapping in config.field_mappings %}
+ <tr>
+ <td>
+ <input type="text"
class="form-control"
+ name="api_field_path[]"
+ value="{{
mapping.api_field_path }}"
+ placeholder="e.g.,
data.user.name or items.0.id" required>
+ </td>
+ <td>
+ <input type="text"
class="form-control"
+ name="db_column_name[]"
+ value="{{
mapping.db_column_name }}"
+ pattern="[a-z_][a-z0-9_]*"
+ placeholder="e.g.,
user_name" required>
+ </td>
+ <td>
+ <select class="form-control"
name="db_column_type[]" required>
+ <option value="VARCHAR(255)"
{{ 'selected' if mapping.db_column_type == 'VARCHAR(255)' else ''
}}>VARCHAR(255)</option>
+ <option value="TEXT" {{
'selected' if mapping.db_column_type == 'TEXT' else '' }}>TEXT</option>
+ <option value="INTEGER" {{
'selected' if mapping.db_column_type == 'INTEGER' else '' }}>INTEGER</option>
+ <option value="DECIMAL(10,2)"
{{ 'selected' if mapping.db_column_type == 'DECIMAL(10,2)' else ''
}}>DECIMAL(10,2)</option>
+ <option value="BOOLEAN" {{
'selected' if mapping.db_column_type == 'BOOLEAN' else '' }}>BOOLEAN</option>
+ <option value="TIMESTAMP" {{
'selected' if mapping.db_column_type == 'TIMESTAMP' else ''
}}>TIMESTAMP</option>
+ <option value="JSONB" {{
'selected' if mapping.db_column_type == 'JSONB' else '' }}>JSONB</option>
+ </select>
+ </td>
+ <td>
+ <button type="button" class="btn
btn-sm btn-danger" onclick="removeRow(this)">
+ <i class="fa fa-trash"></i>
+ </button>
+ </td>
+ </tr>
+ {% endfor %}
+ {% else %}
+ <tr>
+ <td>
+ <input type="text"
class="form-control" name="api_field_path[]"
+ placeholder="e.g., id or
data.user.email" required>
+ </td>
+ <td>
+ <input type="text"
class="form-control" name="db_column_name[]"
+ pattern="[a-z_][a-z0-9_]*"
placeholder="e.g., user_id" required>
+ </td>
+ <td>
+ <select class="form-control"
name="db_column_type[]" required>
+ <option
value="VARCHAR(255)">VARCHAR(255)</option>
+ <option value="TEXT">TEXT</option>
+ <option value="INTEGER"
selected>INTEGER</option>
+ <option
value="DECIMAL(10,2)">DECIMAL(10,2)</option>
+ <option
value="BOOLEAN">BOOLEAN</option>
+ <option
value="TIMESTAMP">TIMESTAMP</option>
+ <option
value="JSONB">JSONB</option>
+ </select>
+ </td>
+ <td>
+ <button type="button" class="btn
btn-sm btn-danger" onclick="removeRow(this)">
+ <i class="fa fa-trash"></i>
+ </button>
+ </td>
+ </tr>
+ {% endif %}
+ </tbody>
+ </table>
+
+ <button type="button" class="btn btn-success"
onclick="addRow()">
+ <i class="fa fa-plus"></i> Add Field Mapping
+ </button>
+
+ <hr>
+
+ <div class="form-group">
+ <button type="submit" class="btn btn-primary">
+ <i class="fa fa-arrow-right"></i> Continue to
SQL Preview
+ </button>
+ <a href="{{ url_for('api_scheduler.dashboard') }}"
class="btn btn-default">
+ <i class="fa fa-times"></i> Cancel
+ </a>
+ </div>
+ </form>
+ </div>
+ </div>
+ </div>
+ </div>
+</div>
+
+<script>
+function addRow() {
+ const tbody = document.getElementById('mappings-tbody');
+ const newRow = `
+ <tr>
+ <td>
+ <input type="text" class="form-control"
name="api_field_path[]"
+ placeholder="e.g., data.items.0.value" required>
+ </td>
+ <td>
+ <input type="text" class="form-control"
name="db_column_name[]"
+ pattern="[a-z_][a-z0-9_]*" placeholder="e.g.,
item_value" required>
+ </td>
+ <td>
+ <select class="form-control" name="db_column_type[]" required>
+ <option value="VARCHAR(255)">VARCHAR(255)</option>
+ <option value="TEXT">TEXT</option>
+ <option value="INTEGER">INTEGER</option>
+ <option value="DECIMAL(10,2)">DECIMAL(10,2)</option>
+ <option value="BOOLEAN">BOOLEAN</option>
+ <option value="TIMESTAMP">TIMESTAMP</option>
+ <option value="JSONB">JSONB</option>
+ </select>
+ </td>
+ <td>
+ <button type="button" class="btn btn-sm btn-danger"
onclick="removeRow(this)">
+ <i class="fa fa-trash"></i>
+ </button>
+ </td>
+ </tr>
+ `;
+ tbody.insertAdjacentHTML('beforeend', newRow);
+}
+
+function removeRow(btn) {
+ const row = btn.closest('tr');
+ if (document.querySelectorAll('#mappings-tbody tr').length > 1) {
+ row.remove();
+ } else {
+ alert('At least one field mapping is required');
+ }
+}
+
+function testAPI() {
+ const resultDiv = document.getElementById('api-test-result');
+ resultDiv.style.display = 'block';
+ resultDiv.className = 'alert alert-info';
+ resultDiv.innerHTML = '<i class="fa fa-spinner fa-spin"></i> Testing
API...';
+
+ const csrfToken =
document.querySelector('meta[name="csrf-token"]').getAttribute('content');
+
+ fetch('{{ url_for("api_scheduler.test_api", config_id=config.id) }}', {
+ method: 'POST',
+ headers: {'Content-Type': 'application/json',
+ 'X-CSRFToken': csrfToken
+ }
+ })
+ .then(response => response.json())
+ .then(data => {
+ if (data.success) {
+ resultDiv.className = 'alert alert-success';
+ resultDiv.innerHTML = `
+ <strong>API Test Successful!</strong><br>
+ Status Code: ${data.status_code}<br>
+ <details>
+ <summary>View Response Data</summary>
+ <pre>${JSON.stringify(data.data, null, 2)}</pre>
+ </details>
+ `;
+ } else {
+ resultDiv.className = 'alert alert-danger';
+ resultDiv.innerHTML = `<strong>API Test Failed:</strong>
${data.error}`;
+ }
+ })
+ .catch(error => {
+ resultDiv.className = 'alert alert-danger';
+ resultDiv.innerHTML = `<strong>Error:</strong> ${error.message}`;
Review Comment:
**Suggestion:** The test API function injects raw response fields
(`data.data` and `data.error`) directly into `innerHTML`, so if the remote API
or backend returns strings containing HTML (e.g., script tags), they will be
interpreted and executed by the browser, leading to a DOM-based XSS
vulnerability; escape or safely render these values as text before inserting
them into the DOM. [security]
<details>
<summary><b>Severity Level:</b> Critical 🚨</summary>
```mdx
- ❌ XSS on Field Mappings Test API result panel.
- ❌ Malicious API response can execute scripts in admin browser.
- ⚠️ Compromised admin session risks Superset configuration integrity.
```
</details>
```suggestion
function escapeHtml(str) {
if (str === null || str === undefined) {
return '';
}
return String(str)
.replace(/&/g, '&')
.replace(/</g, '<')
.replace(/>/g, '>');
}
fetch('{{ url_for("api_scheduler.test_api", config_id=config.id) }}', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': csrfToken
}
})
.then(response => response.json())
.then(data => {
if (data.success) {
resultDiv.className = 'alert alert-success';
resultDiv.innerHTML = `
<strong>API Test Successful!</strong><br>
Status Code: ${data.status_code}<br>
<details>
<summary>View Response Data</summary>
<pre>${escapeHtml(JSON.stringify(data.data, null,
2))}</pre>
</details>
`;
} else {
resultDiv.className = 'alert alert-danger';
resultDiv.innerHTML = `<strong>API Test Failed:</strong>
${escapeHtml(data.error)}`;
}
})
.catch(error => {
resultDiv.className = 'alert alert-danger';
resultDiv.innerHTML = `<strong>Error:</strong>
${escapeHtml(error.message)}`;
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Start the Superset instance with this PR deployed and navigate to the API
scheduler
mappings page that renders `superset/templates/api_scheduler/mappings.html`
(view using
browser dev tools to confirm the `testAPI()` function is present around line
172 in the
compiled HTML/JS).
2. Ensure the backend route `api_scheduler.test_api` (referenced in the
template at
`superset/templates/api_scheduler/mappings.html:180`) returns JSON with a
`data` field
containing a string that includes HTML, for example: `{"success": true,
"status_code":
200, "data": "<img src=x onerror=alert('XSS')>"}`; this is a realistic
scenario when the
endpoint proxies or surfaces raw responses from a third‑party API.
3. On the mappings page, click the "Test API" button in the panel header
(button bound via
`onclick="testAPI()"` at
`superset/templates/api_scheduler/mappings.html:22`), which
executes `testAPI()` and performs the `fetch()` to `api_scheduler.test_api`.
4. Observe that in the success branch of `testAPI()`
(`superset/templates/api_scheduler/mappings.html:186-196`),
`resultDiv.innerHTML` is set
with a template literal that injects the unescaped
`JSON.stringify(data.data, null, 2)`
into a `<pre>` tag; the `<img src=x onerror=alert('XSS')>` inside that
string is
interpreted as HTML by the browser and its `onerror` handler executes,
demonstrating
DOM‑based XSS.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/templates/api_scheduler/mappings.html
**Line:** 180:205
**Comment:**
*Security: The test API function injects raw response fields
(`data.data` and `data.error`) directly into `innerHTML`, so if the remote API
or backend returns strings containing HTML (e.g., script tags), they will be
interpreted and executed by the browser, leading to a DOM-based XSS
vulnerability; escape or safely render these values as text before inserting
them into the DOM.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=46223b9acbdfd3c32d3a7b7dde89862202cf16281fbfc7c66abb49e1d98578af&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=46223b9acbdfd3c32d3a7b7dde89862202cf16281fbfc7c66abb49e1d98578af&reaction=dislike'>👎</a>
##########
superset/templates/api_scheduler/sql_preview.html:
##########
@@ -0,0 +1,103 @@
+{% extends "api_scheduler/base.html" %}
+
+{% block title %}SQL Preview - {{ config.name }}{% endblock %}
+
+{% block content %}
+<div class="container-fluid">
+ <div class="row">
+ <div class="col-md-12">
+ <h2>SQL Preview: {{ config.name }}</h2>
+ <p class="text-muted">Target Table: <code>{{ config.target_table
}}</code></p>
+
+ <div class="panel panel-default">
+ <div class="panel-heading">
+ <h3 class="panel-title">
+ <i class="fa fa-database"></i> CREATE TABLE Statement
+ </h3>
+ </div>
+ <div class="panel-body">
+ <div class="alert alert-info">
+ <strong><i class="fa fa-info-circle"></i>
Note:</strong>
+ You can edit the SQL below to add constraints (UNIQUE,
NOT NULL, DEFAULT, CHECK).
+ Security validation will ensure only safe CREATE TABLE
statements are executed.
+ </div>
+
+ <form method="POST" action="{{
url_for('api_scheduler.execute_sql', config_id=config.id) }}" id="sql-form">
+ <input type="hidden" name="csrf_token" value="{{
csrf_token() }}"/>
+
+ <div class="form-group">
+ <label for="sql">SQL Statement:</label>
+ <textarea class="form-control" id="custom_sql"
name="custom_sql" rows="15" required>{{ sql }}</textarea>
Review Comment:
**Suggestion:** The textarea's id is "custom_sql" while both the label's
"for" attribute and the JavaScript use "sql", so the label won't focus the
textarea and `document.getElementById('sql')` will return null, causing a
runtime error when adding the event listener. Align the textarea id with the
label and script to prevent the null reference and make the UI behave
correctly. [null pointer]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Client-side SQL textarea validation never attaches on this page.
- ⚠️ Clicking "SQL Statement" label does not focus textarea.
- ⚠️ JavaScript error logged in console on every page load.
```
</details>
```suggestion
<textarea class="form-control" id="sql"
name="custom_sql" rows="15" required>{{ sql }}</textarea>
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Start the Superset web app with this PR code deployed so that the template
`superset/templates/api_scheduler/sql_preview.html` is used.
2. In the UI, navigate to the API Scheduler SQL Preview page for any config,
which renders
`sql_preview.html` (confirmed by the form action using
`url_for('api_scheduler.execute_sql', config_id=config.id)` at line 25).
3. Open browser developer tools, inspect the form markup, and observe that
the label at
line 29 is `<label for="sql">SQL Statement:</label>` while the textarea at
line 30 is
`<textarea ... id="custom_sql" ...>`, so no element with id `sql` exists in
the DOM.
4. Still on that page, watch the browser console: when `DOMContentLoaded`
fires, the
script in the same template at lines 82–92 runs `const textarea =
document.getElementById('sql');` followed by
`textarea.addEventListener('input', ...)`;
since `textarea` is `null`, the browser throws a `TypeError` (cannot read
properties of
null / addEventListener), the real-time SQL validation never attaches, and
clicking the
label does not focus the textarea.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/templates/api_scheduler/sql_preview.html
**Line:** 30:30
**Comment:**
*Null Pointer: The textarea's id is "custom_sql" while both the label's
"for" attribute and the JavaScript use "sql", so the label won't focus the
textarea and `document.getElementById('sql')` will return null, causing a
runtime error when adding the event listener. Align the textarea id with the
label and script to prevent the null reference and make the UI behave correctly.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=cc34e48d9e42699ae734523a9dd3c6b90dc56a08b6126a22a5863d1a3fb606d4&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=cc34e48d9e42699ae734523a9dd3c6b90dc56a08b6126a22a5863d1a3fb606d4&reaction=dislike'>👎</a>
##########
superset/views/api_scheduler/views.py:
##########
@@ -0,0 +1,645 @@
+# superset/views/api_scheduler/views.py
+"""
+API Scheduler Views for Apache Superset
+Provides routes for scheduled API data fetching management
+"""
+
+from flask import Blueprint, render_template, request, redirect, url_for,
flash, jsonify, abort
+from superset import db
+from .models import APIConfiguration, FieldMapping, ExecutionLog,
ALLOWED_COLUMN_TYPES
+import requests
+import json
+import re
+from datetime import datetime
+from sqlalchemy import text
+import ipaddress
+
+# Blueprint oluştur (login gerektirmeyen public routes)
+api_scheduler_bp = Blueprint(
+ 'api_scheduler',
+ __name__,
+ url_prefix='/api-scheduler',
+ template_folder='../../templates/api_scheduler',
+ static_folder='../../static/assets/api_scheduler'
+)
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+import pytz
+import logging
+logger = logging.getLogger(__name__)
+# Global scheduler instance
+scheduler = BackgroundScheduler(timezone=pytz.UTC)
+scheduler_started = False
+_app_instance = None # EKLE: Global app instance
+@api_scheduler_bp.record_once
+def on_load(state):
+ """Blueprint yüklendiğinde app instance'ını sakla"""
+ global _app_instance
+ _app_instance = state.app
+ logger.info("API Scheduler blueprint loaded, app instance captured")
+
+# ============== DASHBOARD ==============
+@api_scheduler_bp.route('/')
+@api_scheduler_bp.route('/dashboard')
+def dashboard():
+ """Ana dashboard - Tüm konfigürasyonları listele"""
+ configs = db.session.query(APIConfiguration).all()
+
+ # Her config için son execution ve next run bilgisini al
+ for config in configs:
+ # Last execution
+ last_log = db.session.query(ExecutionLog)\
+ .filter_by(config_id=config.id)\
+ .order_by(ExecutionLog.executed_at.desc())\
+ .first()
+ config.last_execution = last_log
+
+ # Next run time from scheduler
+ config.next_run_time = None
+ if config.is_active:
+ job = scheduler.get_job(f'api_fetch_{config.id}')
+ if job:
+ config.next_run_time = job.next_run_time
+
+ return render_template('api_scheduler/dashboard.html', configs=configs)
+
+
+# ============== CONFIG CRUD ==============
+@api_scheduler_bp.route('/config/new', methods=['GET', 'POST'])
+def new_config():
+ """Yeni API configuration oluştur"""
+ if request.method == 'POST':
+ try:
+ # Validate SSRF
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html')
+
+ # Parse headers
+ headers_str = request.form.get('api_headers', '').strip()
+ if headers_str and headers_str != 'None':
+ try:
+ json.loads(headers_str)
+ except json.JSONDecodeError:
+ flash('Invalid JSON format for headers', 'danger')
+ return render_template('api_scheduler/config_form.html')
+ else:
+ headers_str = None
+
+ config = APIConfiguration(
+ name=request.form['name'],
+ target_table='api_scheduler_' + request.form['target_table'],
+ api_url=api_url,
+ api_method=request.form.get('api_method', 'GET'),
+ api_headers=headers_str,
+ api_key=request.form.get('api_key'),
+ schedule_interval=int(request.form.get('schedule_interval',
300)),
+ is_active=False
+ )
+
+ db.session.add(config)
+ db.session.commit()
+
+ flash(f'Configuration "{config.name}" created successfully!',
'success')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config.id))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error creating configuration: {str(e)}', 'danger')
+
+ return render_template('api_scheduler/config_form.html',config=None)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/edit', methods=['GET',
'POST'])
+def edit_config(config_id):
+ """Config düzenle"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ if request.method == 'POST':
+ try:
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html',
config=config)
+
+ config.name = request.form['name']
+ config.target_table = 'api_scheduler_' +
request.form['target_table'].replace('api_scheduler_', '')
+ config.api_url = api_url
+ config.api_method = request.form.get('api_method', 'GET')
+ config.api_headers = request.form.get('api_headers')
+ config.api_key = request.form.get('api_key')
+ config.schedule_interval =
int(request.form.get('schedule_interval', 300))
+
+ db.session.commit()
+ flash('Configuration updated successfully!', 'success')
+ return redirect(url_for('api_scheduler.dashboard'))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error updating configuration: {str(e)}', 'danger')
+
+ return render_template('api_scheduler/config_form.html', config=config)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/delete', methods=['POST'])
+def delete_config(config_id):
+ """Config sil"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ try:
+ # Drop table if exists
+ try:
+ db.session.execute(text(f'DROP TABLE IF EXISTS
{config.target_table}'))
+ db.session.commit()
+ except:
+ pass
+
+ db.session.delete(config)
+ db.session.commit()
+ flash(f'Configuration "{config.name}" deleted successfully!',
'success')
+ schedule_jobs()
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error deleting configuration: {str(e)}', 'danger')
+
+ return redirect(url_for('api_scheduler.dashboard'))
+
+
+# ============== FIELD MAPPINGS ==============
+@api_scheduler_bp.route('/config/<int:config_id>/mappings', methods=['GET',
'POST'])
+def edit_mappings(config_id):
+ """Field mapping'leri düzenle"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ if request.method == 'POST':
+ try:
+ # Önceki mapping'leri sil
+ old_mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+ for mapping in old_mappings:
+ db.session.delete(mapping)
+ # Array'leri al
+ api_field_paths = request.form.getlist('api_field_path[]')
+ db_column_names = request.form.getlist('db_column_name[]')
+ db_column_types = request.form.getlist('db_column_type[]')
+
+ # Her mapping için kaydet
+ for i in range(len(api_field_paths)):
+ mapping = FieldMapping(
+ config_id=config_id,
+ api_field_path=api_field_paths[i],
+ db_column_name=db_column_names[i].lower(),
+ db_column_type=db_column_types[i]
+ )
+ db.session.add(mapping)
+
+ db.session.commit()
+ flash('Field mappings saved!', 'success')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error adding mapping: {str(e)}', 'danger')
+
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+ return render_template('api_scheduler/mappings.html',
+ config=config,
+ mappings=mappings,
+ column_types=ALLOWED_COLUMN_TYPES.keys())
+
+
+@api_scheduler_bp.route('/mapping/<int:mapping_id>/delete', methods=['POST'])
+def delete_mapping(mapping_id):
+ """Mapping sil"""
+ mapping = db.session.query(FieldMapping).get(mapping_id)
+ if not mapping:
+ abort(404)
+
+ config_id = mapping.config_id
+ db.session.delete(mapping)
+ db.session.commit()
+
+ flash('Field mapping deleted!', 'success')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config_id))
+
+
+# ============== API TEST ==============
+@api_scheduler_bp.route('/config/<int:config_id>/test', methods=['POST'])
+def test_api(config_id):
+ """API'yi test et ve response döndür"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ return jsonify({'success': False, 'error': 'Configuration not
found'}), 404
+
+ try:
+ headers = {}
+ if config.api_headers:
+ try:
+ headers = json.loads(config.api_headers)
+ except:
+ pass
+
+ if config.api_key:
+ headers['Authorization'] = f'Bearer {config.api_key}'
+
+ response = requests.request(
+ method=config.api_method,
+ url=config.api_url,
+ headers=headers,
+ timeout=30
+ )
+
+ response.raise_for_status()
+ data = response.json()
+
+ return jsonify({
+ 'success': True,
+ 'data': data,
+ 'status_code': response.status_code
+ })
+
+ except requests.exceptions.JSONDecodeError as e:
+ return jsonify({
+ 'success': False,
+ 'error': f'Invalid JSON response: {str(e)}',
+ 'raw_response': response.text[:500] if 'response' in locals() else
'No response'
+ }), 400
+ except Exception as e:
+ return jsonify({'success': False, 'error': str(e)}), 400
+
+
+# ============== SQL PREVIEW & EXECUTE ==============
+@api_scheduler_bp.route('/config/<int:config_id>/sql-preview')
+def sql_preview(config_id):
+ """CREATE TABLE SQL önizleme"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+
+ if not mappings:
+ flash('Please add at least one field mapping first', 'warning')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config_id))
+
+ sql = generate_create_table_sql(config, mappings)
+ return render_template('api_scheduler/sql_preview.html', config=config,
sql=sql)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/execute-sql',
methods=['POST'])
+def execute_sql(config_id):
+ """SQL'i çalıştır ve tabloyu oluştur"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ custom_sql = request.form.get('custom_sql', '').strip()
+
+ # Validate SQL
+ if not validate_custom_sql(custom_sql, config.target_table):
+ flash('Invalid or dangerous SQL detected!', 'danger')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+ try:
+ # Drop if exists
+ db.session.execute(text(f'DROP TABLE IF EXISTS {config.target_table}'))
+
+ # Execute CREATE TABLE
+ db.session.execute(text(custom_sql))
+ db.session.commit()
+
+ flash(f'Table "{config.target_table}" created successfully!',
'success')
+ return redirect(url_for('api_scheduler.dashboard'))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error creating table: {str(e)}', 'danger')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+
+# ============== TOGGLE ACTIVE ==============
+@api_scheduler_bp.route('/config/<int:config_id>/toggle', methods=['POST'])
+def toggle_active(config_id):
+ """Config'i aktif/pasif yap"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ return jsonify({'success': False, 'error': 'Not found'}), 404
+
+ config.is_active = not config.is_active
+ db.session.commit()
+ schedule_jobs()
+ return jsonify({'success': True, 'is_active': config.is_active})
+
+
+# ============== LOGS ==============
+@api_scheduler_bp.route('/config/<int:config_id>/logs')
+def view_logs(config_id):
+ """Execution loglarını göster"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ logs = db.session.query(ExecutionLog)\
+ .filter_by(config_id=config_id)\
+ .order_by(ExecutionLog.executed_at.desc())\
+ .limit(50)\
+ .all()
+
+ return render_template('api_scheduler/logs.html', config=config, logs=logs)
+
+
+# ============== HELPER FUNCTIONS ==============
+def validate_url_safe(url):
+ """SSRF koruması - private IP'leri engelle"""
+ try:
+ from urllib.parse import urlparse
+ parsed = urlparse(url)
+ hostname = parsed.hostname
+
+ if not hostname:
+ return False
+
+ # IP adresini resolve et
+ import socket
+ ip = socket.gethostbyname(hostname)
+ ip_obj = ipaddress.ip_address(ip)
+
+ # Private IP'leri engelle
+ if ip_obj.is_private or ip_obj.is_loopback:
+ return False
+
+ return True
+ except:
+ return False
+
+
+def generate_create_table_sql(config, mappings):
+ """CREATE TABLE SQL oluştur"""
+ columns = ['id SERIAL PRIMARY KEY', 'fetched_at TIMESTAMP DEFAULT
CURRENT_TIMESTAMP']
+
+ for mapping in mappings:
+ col_def = f"{mapping.db_column_name} {mapping.db_column_type}"
+ columns.append(col_def)
+
+ sql = f"CREATE TABLE {config.target_table} (\n"
+ sql += ",\n".join(f" {col}" for col in columns)
+ sql += "\n);"
+
+ return sql
+
+
+def validate_custom_sql(sql, expected_table):
+ """SQL güvenlik validasyonu"""
+ sql_upper = sql.upper()
+
+ # Sadece CREATE TABLE izin ver
+ if not sql_upper.startswith('CREATE TABLE'):
+ return False
+
+ # Tehlikeli keyword'leri engelle
+ dangerous = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'GRANT', 'REVOKE',
';--', '/*', '*/']
+ for keyword in dangerous:
+ if keyword in sql_upper.replace('CREATE TABLE', ''):
+ return False
+
+ # Table ismini kontrol et
+ if expected_table.upper() not in sql_upper:
+ return False
+
+ # ID ve FETCHED_AT zorunlu
+ if not re.search(r'\bID\b.*\bSERIAL\b', sql_upper):
+ return False
+ if not re.search(r'\bFETCHED_AT\b.*\bTIMESTAMP\b', sql_upper):
+ return False
+
+ return True
+
+
+# ============== LOG MANAGEMENT ==============
+@api_scheduler_bp.route('/logs/<int:log_id>/delete', methods=['POST'])
+def delete_log(log_id):
+ """Delete a single log entry"""
+ try:
+ log = db.session.get(ExecutionLog, log_id)
+ if not log:
+ return jsonify({'success': False, 'error': 'Log not found'}), 404
+
+ db.session.delete(log)
+ db.session.commit()
+
+ return jsonify({'success': True})
+ except Exception as e:
+ db.session.rollback()
+ return jsonify({'success': False, 'error': str(e)}), 400
+@api_scheduler_bp.route('/logs/clear-all', methods=['POST'])
+def clear_all_logs():
+ """Delete all execution logs"""
+ try:
+ logs_to_delete = db.session.query(ExecutionLog).all()
+ deleted_count = len(logs_to_delete)
+ for log in logs_to_delete:
+ db.session.delete(log)
+
+ db.session.commit()
+
+ return jsonify({'success': True, 'deleted': deleted_count})
+ except Exception as e:
+ db.session.rollback()
+ return jsonify({'success': False, 'error': str(e)}), 400
+
+# ==================== SCHEDULER SETUP ====================
+
+
+def fetch_api_data(config_id):
+ """Fetch data from API and insert into table (with app context)"""
+ global _app_instance
+
+ if not _app_instance:
+ logger.error("App instance not available!")
+ return
+
+ with _app_instance.app_context():
+ config = db.session.get(APIConfiguration, config_id)
+ if not config or not config.is_active:
+ logger.info(f"Config {config_id} not active or not found,
skipping")
+ return
+
+ try:
+ logger.info(f"Starting scheduled fetch for config: {config.name}")
+
+ # Prepare headers
+ headers = {}
+ if config.api_headers:
+ try:
+ headers = json.loads(config.api_headers)
+ except:
+ pass
+
+ if config.api_key:
+ headers['Authorization'] = f'Bearer {config.api_key}'
+
+ # Fetch API data
+ response = requests.request(
+ method=config.api_method,
+ url=config.api_url,
+ headers=headers,
+ timeout=30
+ )
+ response.raise_for_status()
+ data = response.json()
+
+ # Get field mappings
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config.id).all()
+ if not mappings:
+ raise Exception("No field mappings configured")
+ # ARRAY KONTROLÜ EKLE!
+ if isinstance(data, list):
+ # Array ise her eleman için işle
+ records_inserted = 0
+ for item in data:
+ row_data = {}
+ for mapping in mappings:
+ value = item
+ for key in mapping.api_field_path.split('.'):
+ value = value.get(key) if isinstance(value, dict)
else None
+ if value is None:
+ break
+
+ # Convert value to target type
+ if value is not None:
+ if mapping.db_column_type == 'JSONB':
+ row_data[mapping.db_column_name] =
json.dumps(value) if not isinstance(value, str) else value
+ elif mapping.db_column_type == 'INTEGER':
+ row_data[mapping.db_column_name] = int(value)
+ elif mapping.db_column_type == 'FLOAT':
+ row_data[mapping.db_column_name] = float(value)
+ elif mapping.db_column_type == 'BOOLEAN':
+ row_data[mapping.db_column_name] = bool(value)
+ else:
+ row_data[mapping.db_column_name] = str(value)
+
+ # Insert into table
+ if row_data:
+ # columns = ', '.join(row_data.keys()) + ', fetched_at'
+ # placeholders = ', '.join(['%s'] * len(row_data)) +
', NOW()'
+ # sql = f"INSERT INTO {config.target_table}
({columns}) VALUES ({placeholders})"
+
+ # db.session.execute(text(sql),
list(row_data.values()))
+ columns = ', '.join(row_data.keys()) + ', fetched_at'
+ placeholders = ', '.join([f":{key}" for key in
row_data.keys()]) + ', NOW()'
+ sql = f"INSERT INTO {config.target_table} ({columns})
VALUES ({placeholders})"
+ db.session.execute(text(sql), row_data)
+
+ records_inserted += 1
+
+ db.session.commit()
+
+ # Log success
+ log = ExecutionLog(
+ config_id=config.id,
+ status='success',
+ message=f'Successfully fetched and inserted
{records_inserted} records',
+ records_inserted=records_inserted
+ )
+ db.session.add(log)
+ db.session.commit()
+
+ logger.info(f"✓ Scheduled job executed successfully for
config: {config.name} ({records_inserted} records)")
+
+ # Extract values from API response
+ row_data = {}
+ for mapping in mappings:
+ value = data
+ for key in mapping.api_field_path.split('.'):
+ value = value.get(key) if isinstance(value, dict) else None
+ if value is None:
+ break
+
+ # Convert value to target type
+ if value is not None:
+ if mapping.db_column_type == 'JSONB':
+ row_data[mapping.db_column_name] = json.dumps(value)
if not isinstance(value, str) else value
+ elif mapping.db_column_type == 'INTEGER':
+ row_data[mapping.db_column_name] = int(value)
+ elif mapping.db_column_type == 'FLOAT':
+ row_data[mapping.db_column_name] = float(value)
+ elif mapping.db_column_type == 'BOOLEAN':
+ row_data[mapping.db_column_name] = bool(value)
+ else:
+ row_data[mapping.db_column_name] = str(value)
+
+ # Insert into table
+ if row_data:
+ columns = ', '.join(row_data.keys()) + ', fetched_at'
+ placeholders = ', '.join(['%s'] * len(row_data)) + ', NOW()'
+ sql = f"INSERT INTO {config.target_table} ({columns}) VALUES
({placeholders})"
+
+ db.session.execute(text(sql), list(row_data.values()))
Review Comment:
**Suggestion:** For non-list API responses, the INSERT statement uses `%s`
placeholders with `sqlalchemy.text`, which doesn't support this parameter
style, so the execution will fail at runtime instead of inserting the row.
[logic error]
<details>
<summary><b>Severity Level:</b> Critical 🚨</summary>
```mdx
- ❌ Scheduled jobs with object responses fail to insert data.
- ⚠️ Execution logs record misleading successes for zero inserted fields.
- ⚠️ Downstream dashboards see empty or stale data.
```
</details>
```suggestion
placeholders = ', '.join([f":{key}" for key in
row_data.keys()]) + ', NOW()'
sql = f"INSERT INTO {config.target_table} ({columns}) VALUES
({placeholders})"
db.session.execute(text(sql), row_data)
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Configure an API in the scheduler UI so that `response.json()` in
`fetch_api_data()`
(lines 488–495) returns a single JSON object (e.g. `{"id":1,"value":10}`)
instead of a
list.
2. Ensure field mappings exist for the configuration so that `mappings =
db.session.query(FieldMapping)...` (lines 498–501) succeeds, and the `if
isinstance(data,
list):` branch at lines 503–554 is skipped because `data` is a dict.
3. At lines 556–575, `row_data` is built from the dict and is non-empty; the
non-list
insert path at lines 577–584 is entered.
4. `db.session.execute(text(sql), list(row_data.values()))` is called with
`sql`
containing literal `%s` placeholders instead of SQLAlchemy bind parameters
(`:name`), so
SQLAlchemy treats `%s` as literal text, and the database driver (e.g.
Postgres) raises a
SQL error (e.g. "column \"%s\" does not exist"), preventing any row from
being inserted.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/views/api_scheduler/views.py
**Line:** 580:583
**Comment:**
*Logic Error: For non-list API responses, the INSERT statement uses
`%s` placeholders with `sqlalchemy.text`, which doesn't support this parameter
style, so the execution will fail at runtime instead of inserting the row.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=a557d0490e6c4b7a372f8fa951854b492751e61a91e348abd01f2f52158f4c61&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=a557d0490e6c4b7a372f8fa951854b492751e61a91e348abd01f2f52158f4c61&reaction=dislike'>👎</a>
##########
superset/views/api_scheduler/tasks.py:
##########
@@ -0,0 +1,41 @@
+from celery import task
+from superset import db
+from .models import APIConfiguration, ExecutionLog
+import requests
+
+@task(bind=True)
+def fetch_api_data(self, config_id):
+ """Celery task - APScheduler yerine"""
+ config = APIConfiguration.query.get(config_id)
+
+ try:
+ response = requests.get(config.api_url, timeout=30)
+ data = response.json()
+
+ # Data'yı veritabanına kaydet
+ # ... insert logic ...
+
+ log = ExecutionLog(
+ config_id=config_id,
+ status='success',
+ message=f'Fetched {len(data)} records'
+ )
+ db.session.add(log)
+ db.session.commit()
+
+ except Exception as e:
+ log = ExecutionLog(
Review Comment:
**Suggestion:** If any database operation inside the try block (including
the commented insert logic or the success `commit`) raises an exception, the
SQLAlchemy session enters a failed state; the except block then calls `add` and
`commit` on the same session without a `rollback`, which will raise a
`PendingRollbackError` and prevent the error log from being persisted. Call
`db.session.rollback()` at the start of the except block to reset the session
before adding and committing the error log entry. [logic error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Periodic task `fetch_api_data` error logs never persisted.
- ⚠️ Subsequent DB operations in task crash with PendingRollbackError.
- ⚠️ Scheduler monitoring via `ExecutionLog` becomes unreliable on DB errors.
```
</details>
```suggestion
db.session.rollback()
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Start Superset with Celery worker and beat enabled so that the periodic
task defined in
`superset/views/api_scheduler/tasks.py:35-40`
(`CELERYBEAT_SCHEDULE['api-scheduler-fetch']`) runs
`superset.views.api_scheduler.tasks.fetch_api_data`.
2. Ensure there is an `APIConfiguration` row in the database with id
`<config_id>` so that
`APIConfiguration.query.get(config_id)` in `fetch_api_data` at
`superset/views/api_scheduler/tasks.py:8-9` returns a valid object.
3. Trigger a database error inside the `try` block of `fetch_api_data` at
`superset/views/api_scheduler/tasks.py:11-24`, e.g. by implementing the
commented-out
insert logic to write invalid data that violates a constraint, or by causing
`db.session.commit()` at line 23-24 to fail (for instance, by dropping the
`execution_log`
table while the worker is running).
4. When the database error is raised during the `db.session.commit()` in the
success path,
SQLAlchemy marks `db.session` as failed; control flows into the `except
Exception as e:`
block at `superset/views/api_scheduler/tasks.py:26`, and the subsequent
`db.session.add(log)` and `db.session.commit()` at lines 32-33 attempt to
use the same
failed session, triggering a `sqlalchemy.exc.PendingRollbackError` instead
of persisting
the `ExecutionLog` error entry.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/views/api_scheduler/tasks.py
**Line:** 27:27
**Comment:**
*Logic Error: If any database operation inside the try block (including
the commented insert logic or the success `commit`) raises an exception, the
SQLAlchemy session enters a failed state; the except block then calls `add` and
`commit` on the same session without a `rollback`, which will raise a
`PendingRollbackError` and prevent the error log from being persisted. Call
`db.session.rollback()` at the start of the except block to reset the session
before adding and committing the error log entry.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=7acf3266ee3f1bf3baf8f91fe07cd5db55ea0ae951486056566eb33163d81264&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=7acf3266ee3f1bf3baf8f91fe07cd5db55ea0ae951486056566eb33163d81264&reaction=dislike'>👎</a>
##########
docker/pythonpath_dev/superset_config.py:
##########
@@ -136,3 +136,66 @@ class CeleryConfig:
)
except ImportError:
logger.info("Using default Docker config...")
+
+
+# MonexpertAPI Scheduler Integration
+# MonexpertAPI Scheduler Integration
+from superset.initialization import SupersetAppInitializer
+
+class CustomAppInitializer(SupersetAppInitializer):
+ """Custom initializer to add API Scheduler after AppBuilder menu is
ready"""
+
+ def init_views(self):
+ """Called after AppBuilder builds its menu tree"""
+ super().init_views()
+
+ logger.info("Registering API Scheduler Blueprint...")
+
+ try:
+ from superset.views.api_scheduler.views import api_scheduler_bp
+ self.superset_app.register_blueprint(api_scheduler_bp)
+ logger.info("✓ API Scheduler Blueprint registered")
+
+ from superset import appbuilder
+ appbuilder.add_link(
+ name="API Scheduler",
+ href="/api-scheduler/dashboard",
+ icon="fa-clock",
+ category="Settings"
+ )
+ logger.info("✓ API Scheduler menu link added")
+
+ except Exception as e:
+ logger.error(f"Failed to register API Scheduler: {str(e)}")
+ raise
+
+APP_INITIALIZER = CustomAppInitializer
+
+# Enable template processing for Handlebars charts
+ENABLE_TEMPLATE_PROCESSING = True
+# DISABLE HTML sanitization (CSS/JS için gerekli)
+HTML_SANITIZATION = False
+# Extended schema (güvenlik için - sanitization tutmak istersen)
+HTML_SANITIZATION_SCHEMA_EXTENSIONS = {
+ "tags": ["div", "span", "p", "br", "style", "script", "h1", "h2", "h3",
"img", "a"],
+ "attributes": {
+ "*": ["class", "style", "id", "data-*", "onclick"],
Review Comment:
**Suggestion:** Disabling HTML sanitization and explicitly allowing `script`
tags and `onclick` attributes means any user-controlled HTML rendered in charts
or dashboards can execute arbitrary JavaScript in the browser (stored XSS). To
avoid this, keep sanitization enabled and do not whitelist scripting-related
tags/attributes while still allowing the extra structural/formatting tags you
need. [security]
<details>
<summary><b>Severity Level:</b> Critical 🚨</summary>
```mdx
- ❌ Superset dashboards execute attacker-supplied JavaScript in browsers.
- ❌ Any user viewing compromised dashboards can have session stolen.
- ⚠️ All HTML-capable visualizations inherit this unsafe configuration.
```
</details>
```suggestion
HTML_SANITIZATION = True
# Extended schema (güvenlik için - sanitization tutmak istersen)
HTML_SANITIZATION_SCHEMA_EXTENSIONS = {
"tags": ["div", "span", "p", "br", "style", "h1", "h2", "h3", "img",
"a"],
"attributes": {
"*": ["class", "style", "id", "data-*"],
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Start Superset using the dev Docker image that loads
`docker/pythonpath_dev/superset_config.py` (this file is the active config
in the
container).
2. As any authenticated user allowed to edit content, create or edit a
dashboard and add a
Markdown/HTML component that supports raw HTML rendering (standard Superset
feature).
3. In the Markdown/HTML component, input user-controlled HTML such as
`<script>window.xss_hit = true; alert('XSS')</script>` or `<a
onclick="window.xss_hit =
true">click</a>` and save the dashboard.
4. Re-open/view the dashboard in a browser; because `HTML_SANITIZATION =
False` and
`HTML_SANITIZATION_SCHEMA_EXTENSIONS` at
`docker/pythonpath_dev/superset_config.py:177-186` explicitly allow `script`
tags and
`onclick` attributes, the malicious JavaScript executes in the viewer's
browser (stored
XSS).
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** docker/pythonpath_dev/superset_config.py
**Line:** 177:182
**Comment:**
*Security: Disabling HTML sanitization and explicitly allowing `script`
tags and `onclick` attributes means any user-controlled HTML rendered in charts
or dashboards can execute arbitrary JavaScript in the browser (stored XSS). To
avoid this, keep sanitization enabled and do not whitelist scripting-related
tags/attributes while still allowing the extra structural/formatting tags you
need.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=a1561e0d9c67984310fa609e2e69c3bfaa9d084487c78e4d73b28c8fdab0a02c&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=a1561e0d9c67984310fa609e2e69c3bfaa9d084487c78e4d73b28c8fdab0a02c&reaction=dislike'>👎</a>
##########
superset/views/api_scheduler/views.py:
##########
@@ -0,0 +1,645 @@
+# superset/views/api_scheduler/views.py
+"""
+API Scheduler Views for Apache Superset
+Provides routes for scheduled API data fetching management
+"""
+
+from flask import Blueprint, render_template, request, redirect, url_for,
flash, jsonify, abort
+from superset import db
+from .models import APIConfiguration, FieldMapping, ExecutionLog,
ALLOWED_COLUMN_TYPES
+import requests
+import json
+import re
+from datetime import datetime
+from sqlalchemy import text
+import ipaddress
+
+# Blueprint oluştur (login gerektirmeyen public routes)
+api_scheduler_bp = Blueprint(
+ 'api_scheduler',
+ __name__,
+ url_prefix='/api-scheduler',
+ template_folder='../../templates/api_scheduler',
+ static_folder='../../static/assets/api_scheduler'
+)
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+import pytz
+import logging
+logger = logging.getLogger(__name__)
+# Global scheduler instance
+scheduler = BackgroundScheduler(timezone=pytz.UTC)
+scheduler_started = False
+_app_instance = None # EKLE: Global app instance
+@api_scheduler_bp.record_once
+def on_load(state):
+ """Blueprint yüklendiğinde app instance'ını sakla"""
+ global _app_instance
+ _app_instance = state.app
+ logger.info("API Scheduler blueprint loaded, app instance captured")
+
+# ============== DASHBOARD ==============
+@api_scheduler_bp.route('/')
+@api_scheduler_bp.route('/dashboard')
+def dashboard():
+ """Ana dashboard - Tüm konfigürasyonları listele"""
+ configs = db.session.query(APIConfiguration).all()
+
+ # Her config için son execution ve next run bilgisini al
+ for config in configs:
+ # Last execution
+ last_log = db.session.query(ExecutionLog)\
+ .filter_by(config_id=config.id)\
+ .order_by(ExecutionLog.executed_at.desc())\
+ .first()
+ config.last_execution = last_log
+
+ # Next run time from scheduler
+ config.next_run_time = None
+ if config.is_active:
+ job = scheduler.get_job(f'api_fetch_{config.id}')
+ if job:
+ config.next_run_time = job.next_run_time
+
+ return render_template('api_scheduler/dashboard.html', configs=configs)
+
+
+# ============== CONFIG CRUD ==============
+@api_scheduler_bp.route('/config/new', methods=['GET', 'POST'])
+def new_config():
+ """Yeni API configuration oluştur"""
+ if request.method == 'POST':
+ try:
+ # Validate SSRF
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html')
+
+ # Parse headers
+ headers_str = request.form.get('api_headers', '').strip()
+ if headers_str and headers_str != 'None':
+ try:
+ json.loads(headers_str)
+ except json.JSONDecodeError:
+ flash('Invalid JSON format for headers', 'danger')
+ return render_template('api_scheduler/config_form.html')
+ else:
+ headers_str = None
+
+ config = APIConfiguration(
+ name=request.form['name'],
+ target_table='api_scheduler_' + request.form['target_table'],
Review Comment:
**Suggestion:** The `target_table` name is taken directly from user input
and concatenated into SQL statements, which allows malicious characters (like
quotes or semicolons) to break out of the identifier and inject arbitrary SQL
when creating or dropping tables. [security]
<details>
<summary><b>Severity Level:</b> Critical 🚨</summary>
```mdx
- ❌ DROP and INSERT statements accept attacker-controlled table identifier.
- ❌ Attacker can execute arbitrary SQL as Superset DB user.
- ⚠️ Compromises integrity of application metadata and scheduler tables.
```
</details>
```suggestion
raw_table = request.form['target_table']
safe_table = re.sub(r'[^a-zA-Z0-9_]', '', raw_table)
if not safe_table:
raise ValueError('Invalid target table name')
config = APIConfiguration(
name=request.form['name'],
target_table='api_scheduler_' + safe_table,
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Start Superset with this PR code so that the `api_scheduler` blueprint in
`superset/views/api_scheduler/views.py` is registered (lines 17–24).
2. In the UI, submit the form that POSTs to `/api-scheduler/config/new`
handled by
`new_config()` (lines 68–112) with `target_table` set to a value containing
SQL, for
example: `mytable"; DROP TABLE public.slices;--`.
3. `new_config()` at lines 90–99 stores `config.target_table` as
`api_scheduler_` plus the
raw `target_table` value, without sanitization or validation against
dangerous characters.
4. Later, trigger code paths that interpolate `config.target_table` directly
into SQL,
such as:
- `delete_config()` at lines 147–170, which executes `DROP TABLE IF EXISTS
{config.target_table}` via `db.session.execute(text(f'DROP TABLE IF EXISTS
{config.target_table}'))` (lines 157–158), or
- `fetch_api_data()` at lines 459–595, which builds `INSERT INTO
{config.target_table}
(...) VALUES (...)` (lines 577–583).
In both cases, the malicious characters in `config.target_table` break
out of the
identifier context and allow execution of attacker-controlled SQL.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/views/api_scheduler/views.py
**Line:** 90:92
**Comment:**
*Security: The `target_table` name is taken directly from user input
and concatenated into SQL statements, which allows malicious characters (like
quotes or semicolons) to break out of the identifier and inject arbitrary SQL
when creating or dropping tables.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=337735fa86f8708b0397c0c55cf6fb55e72d547e2d6f123533c033493e8c56ec&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=337735fa86f8708b0397c0c55cf6fb55e72d547e2d6f123533c033493e8c56ec&reaction=dislike'>👎</a>
##########
superset/views/api_scheduler/views.py:
##########
@@ -0,0 +1,645 @@
+# superset/views/api_scheduler/views.py
+"""
+API Scheduler Views for Apache Superset
+Provides routes for scheduled API data fetching management
+"""
+
+from flask import Blueprint, render_template, request, redirect, url_for,
flash, jsonify, abort
+from superset import db
+from .models import APIConfiguration, FieldMapping, ExecutionLog,
ALLOWED_COLUMN_TYPES
+import requests
+import json
+import re
+from datetime import datetime
+from sqlalchemy import text
+import ipaddress
+
+# Blueprint oluştur (login gerektirmeyen public routes)
+api_scheduler_bp = Blueprint(
+ 'api_scheduler',
+ __name__,
+ url_prefix='/api-scheduler',
+ template_folder='../../templates/api_scheduler',
+ static_folder='../../static/assets/api_scheduler'
+)
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+import pytz
+import logging
+logger = logging.getLogger(__name__)
+# Global scheduler instance
+scheduler = BackgroundScheduler(timezone=pytz.UTC)
+scheduler_started = False
+_app_instance = None # EKLE: Global app instance
+@api_scheduler_bp.record_once
+def on_load(state):
+ """Blueprint yüklendiğinde app instance'ını sakla"""
+ global _app_instance
+ _app_instance = state.app
+ logger.info("API Scheduler blueprint loaded, app instance captured")
+
+# ============== DASHBOARD ==============
+@api_scheduler_bp.route('/')
+@api_scheduler_bp.route('/dashboard')
+def dashboard():
+ """Ana dashboard - Tüm konfigürasyonları listele"""
+ configs = db.session.query(APIConfiguration).all()
+
+ # Her config için son execution ve next run bilgisini al
+ for config in configs:
+ # Last execution
+ last_log = db.session.query(ExecutionLog)\
+ .filter_by(config_id=config.id)\
+ .order_by(ExecutionLog.executed_at.desc())\
+ .first()
+ config.last_execution = last_log
+
+ # Next run time from scheduler
+ config.next_run_time = None
+ if config.is_active:
+ job = scheduler.get_job(f'api_fetch_{config.id}')
+ if job:
+ config.next_run_time = job.next_run_time
+
+ return render_template('api_scheduler/dashboard.html', configs=configs)
+
+
+# ============== CONFIG CRUD ==============
+@api_scheduler_bp.route('/config/new', methods=['GET', 'POST'])
+def new_config():
+ """Yeni API configuration oluştur"""
+ if request.method == 'POST':
+ try:
+ # Validate SSRF
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html')
+
+ # Parse headers
+ headers_str = request.form.get('api_headers', '').strip()
+ if headers_str and headers_str != 'None':
+ try:
+ json.loads(headers_str)
+ except json.JSONDecodeError:
+ flash('Invalid JSON format for headers', 'danger')
+ return render_template('api_scheduler/config_form.html')
+ else:
+ headers_str = None
+
+ config = APIConfiguration(
+ name=request.form['name'],
+ target_table='api_scheduler_' + request.form['target_table'],
+ api_url=api_url,
+ api_method=request.form.get('api_method', 'GET'),
+ api_headers=headers_str,
+ api_key=request.form.get('api_key'),
+ schedule_interval=int(request.form.get('schedule_interval',
300)),
+ is_active=False
+ )
+
+ db.session.add(config)
+ db.session.commit()
+
+ flash(f'Configuration "{config.name}" created successfully!',
'success')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config.id))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error creating configuration: {str(e)}', 'danger')
+
+ return render_template('api_scheduler/config_form.html',config=None)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/edit', methods=['GET',
'POST'])
+def edit_config(config_id):
+ """Config düzenle"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ if request.method == 'POST':
+ try:
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html',
config=config)
+
+ config.name = request.form['name']
+ config.target_table = 'api_scheduler_' +
request.form['target_table'].replace('api_scheduler_', '')
+ config.api_url = api_url
+ config.api_method = request.form.get('api_method', 'GET')
+ config.api_headers = request.form.get('api_headers')
+ config.api_key = request.form.get('api_key')
+ config.schedule_interval =
int(request.form.get('schedule_interval', 300))
+
+ db.session.commit()
+ flash('Configuration updated successfully!', 'success')
+ return redirect(url_for('api_scheduler.dashboard'))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error updating configuration: {str(e)}', 'danger')
+
+ return render_template('api_scheduler/config_form.html', config=config)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/delete', methods=['POST'])
+def delete_config(config_id):
+ """Config sil"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ try:
+ # Drop table if exists
+ try:
+ db.session.execute(text(f'DROP TABLE IF EXISTS
{config.target_table}'))
+ db.session.commit()
+ except:
+ pass
+
+ db.session.delete(config)
+ db.session.commit()
+ flash(f'Configuration "{config.name}" deleted successfully!',
'success')
+ schedule_jobs()
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error deleting configuration: {str(e)}', 'danger')
+
+ return redirect(url_for('api_scheduler.dashboard'))
+
+
+# ============== FIELD MAPPINGS ==============
+@api_scheduler_bp.route('/config/<int:config_id>/mappings', methods=['GET',
'POST'])
+def edit_mappings(config_id):
+ """Field mapping'leri düzenle"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ if request.method == 'POST':
+ try:
+ # Önceki mapping'leri sil
+ old_mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+ for mapping in old_mappings:
+ db.session.delete(mapping)
+ # Array'leri al
+ api_field_paths = request.form.getlist('api_field_path[]')
+ db_column_names = request.form.getlist('db_column_name[]')
+ db_column_types = request.form.getlist('db_column_type[]')
+
+ # Her mapping için kaydet
+ for i in range(len(api_field_paths)):
+ mapping = FieldMapping(
+ config_id=config_id,
+ api_field_path=api_field_paths[i],
+ db_column_name=db_column_names[i].lower(),
+ db_column_type=db_column_types[i]
+ )
+ db.session.add(mapping)
+
+ db.session.commit()
+ flash('Field mappings saved!', 'success')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error adding mapping: {str(e)}', 'danger')
+
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+ return render_template('api_scheduler/mappings.html',
+ config=config,
+ mappings=mappings,
+ column_types=ALLOWED_COLUMN_TYPES.keys())
+
+
+@api_scheduler_bp.route('/mapping/<int:mapping_id>/delete', methods=['POST'])
+def delete_mapping(mapping_id):
+ """Mapping sil"""
+ mapping = db.session.query(FieldMapping).get(mapping_id)
+ if not mapping:
+ abort(404)
+
+ config_id = mapping.config_id
+ db.session.delete(mapping)
+ db.session.commit()
+
+ flash('Field mapping deleted!', 'success')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config_id))
+
+
+# ============== API TEST ==============
+@api_scheduler_bp.route('/config/<int:config_id>/test', methods=['POST'])
+def test_api(config_id):
+ """API'yi test et ve response döndür"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ return jsonify({'success': False, 'error': 'Configuration not
found'}), 404
+
+ try:
+ headers = {}
+ if config.api_headers:
+ try:
+ headers = json.loads(config.api_headers)
+ except:
+ pass
+
+ if config.api_key:
+ headers['Authorization'] = f'Bearer {config.api_key}'
+
+ response = requests.request(
+ method=config.api_method,
+ url=config.api_url,
+ headers=headers,
+ timeout=30
+ )
+
+ response.raise_for_status()
+ data = response.json()
+
+ return jsonify({
+ 'success': True,
+ 'data': data,
+ 'status_code': response.status_code
+ })
+
+ except requests.exceptions.JSONDecodeError as e:
+ return jsonify({
+ 'success': False,
+ 'error': f'Invalid JSON response: {str(e)}',
+ 'raw_response': response.text[:500] if 'response' in locals() else
'No response'
+ }), 400
+ except Exception as e:
+ return jsonify({'success': False, 'error': str(e)}), 400
+
+
+# ============== SQL PREVIEW & EXECUTE ==============
+@api_scheduler_bp.route('/config/<int:config_id>/sql-preview')
+def sql_preview(config_id):
+ """CREATE TABLE SQL önizleme"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+
+ if not mappings:
+ flash('Please add at least one field mapping first', 'warning')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config_id))
+
+ sql = generate_create_table_sql(config, mappings)
+ return render_template('api_scheduler/sql_preview.html', config=config,
sql=sql)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/execute-sql',
methods=['POST'])
+def execute_sql(config_id):
+ """SQL'i çalıştır ve tabloyu oluştur"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ custom_sql = request.form.get('custom_sql', '').strip()
+
+ # Validate SQL
+ if not validate_custom_sql(custom_sql, config.target_table):
+ flash('Invalid or dangerous SQL detected!', 'danger')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+ try:
+ # Drop if exists
+ db.session.execute(text(f'DROP TABLE IF EXISTS {config.target_table}'))
+
+ # Execute CREATE TABLE
+ db.session.execute(text(custom_sql))
+ db.session.commit()
+
+ flash(f'Table "{config.target_table}" created successfully!',
'success')
+ return redirect(url_for('api_scheduler.dashboard'))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error creating table: {str(e)}', 'danger')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+
+# ============== TOGGLE ACTIVE ==============
+@api_scheduler_bp.route('/config/<int:config_id>/toggle', methods=['POST'])
+def toggle_active(config_id):
+ """Config'i aktif/pasif yap"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ return jsonify({'success': False, 'error': 'Not found'}), 404
+
+ config.is_active = not config.is_active
+ db.session.commit()
+ schedule_jobs()
+ return jsonify({'success': True, 'is_active': config.is_active})
+
+
+# ============== LOGS ==============
+@api_scheduler_bp.route('/config/<int:config_id>/logs')
+def view_logs(config_id):
+ """Execution loglarını göster"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ logs = db.session.query(ExecutionLog)\
+ .filter_by(config_id=config_id)\
+ .order_by(ExecutionLog.executed_at.desc())\
+ .limit(50)\
+ .all()
+
+ return render_template('api_scheduler/logs.html', config=config, logs=logs)
+
+
+# ============== HELPER FUNCTIONS ==============
+def validate_url_safe(url):
+ """SSRF koruması - private IP'leri engelle"""
+ try:
+ from urllib.parse import urlparse
+ parsed = urlparse(url)
+ hostname = parsed.hostname
+
+ if not hostname:
+ return False
+
+ # IP adresini resolve et
+ import socket
+ ip = socket.gethostbyname(hostname)
+ ip_obj = ipaddress.ip_address(ip)
+
+ # Private IP'leri engelle
+ if ip_obj.is_private or ip_obj.is_loopback:
+ return False
+
+ return True
+ except:
+ return False
+
+
+def generate_create_table_sql(config, mappings):
+ """CREATE TABLE SQL oluştur"""
+ columns = ['id SERIAL PRIMARY KEY', 'fetched_at TIMESTAMP DEFAULT
CURRENT_TIMESTAMP']
+
+ for mapping in mappings:
+ col_def = f"{mapping.db_column_name} {mapping.db_column_type}"
+ columns.append(col_def)
+
+ sql = f"CREATE TABLE {config.target_table} (\n"
+ sql += ",\n".join(f" {col}" for col in columns)
+ sql += "\n);"
+
+ return sql
+
+
+def validate_custom_sql(sql, expected_table):
+ """SQL güvenlik validasyonu"""
+ sql_upper = sql.upper()
+
+ # Sadece CREATE TABLE izin ver
+ if not sql_upper.startswith('CREATE TABLE'):
+ return False
+
+ # Tehlikeli keyword'leri engelle
+ dangerous = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'GRANT', 'REVOKE',
';--', '/*', '*/']
+ for keyword in dangerous:
+ if keyword in sql_upper.replace('CREATE TABLE', ''):
+ return False
+
+ # Table ismini kontrol et
+ if expected_table.upper() not in sql_upper:
+ return False
+
+ # ID ve FETCHED_AT zorunlu
+ if not re.search(r'\bID\b.*\bSERIAL\b', sql_upper):
+ return False
+ if not re.search(r'\bFETCHED_AT\b.*\bTIMESTAMP\b', sql_upper):
+ return False
+
+ return True
+
+
+# ============== LOG MANAGEMENT ==============
+@api_scheduler_bp.route('/logs/<int:log_id>/delete', methods=['POST'])
+def delete_log(log_id):
+ """Delete a single log entry"""
+ try:
+ log = db.session.get(ExecutionLog, log_id)
+ if not log:
+ return jsonify({'success': False, 'error': 'Log not found'}), 404
+
+ db.session.delete(log)
+ db.session.commit()
+
+ return jsonify({'success': True})
+ except Exception as e:
+ db.session.rollback()
+ return jsonify({'success': False, 'error': str(e)}), 400
+@api_scheduler_bp.route('/logs/clear-all', methods=['POST'])
+def clear_all_logs():
+ """Delete all execution logs"""
+ try:
+ logs_to_delete = db.session.query(ExecutionLog).all()
+ deleted_count = len(logs_to_delete)
+ for log in logs_to_delete:
+ db.session.delete(log)
+
+ db.session.commit()
+
+ return jsonify({'success': True, 'deleted': deleted_count})
+ except Exception as e:
+ db.session.rollback()
+ return jsonify({'success': False, 'error': str(e)}), 400
+
+# ==================== SCHEDULER SETUP ====================
+
+
+def fetch_api_data(config_id):
+ """Fetch data from API and insert into table (with app context)"""
+ global _app_instance
+
+ if not _app_instance:
+ logger.error("App instance not available!")
+ return
+
+ with _app_instance.app_context():
+ config = db.session.get(APIConfiguration, config_id)
+ if not config or not config.is_active:
+ logger.info(f"Config {config_id} not active or not found,
skipping")
+ return
+
+ try:
+ logger.info(f"Starting scheduled fetch for config: {config.name}")
+
+ # Prepare headers
+ headers = {}
+ if config.api_headers:
+ try:
+ headers = json.loads(config.api_headers)
+ except:
+ pass
+
+ if config.api_key:
+ headers['Authorization'] = f'Bearer {config.api_key}'
+
+ # Fetch API data
+ response = requests.request(
+ method=config.api_method,
+ url=config.api_url,
+ headers=headers,
+ timeout=30
+ )
+ response.raise_for_status()
+ data = response.json()
+
+ # Get field mappings
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config.id).all()
+ if not mappings:
+ raise Exception("No field mappings configured")
+ # ARRAY KONTROLÜ EKLE!
+ if isinstance(data, list):
+ # Array ise her eleman için işle
+ records_inserted = 0
+ for item in data:
+ row_data = {}
+ for mapping in mappings:
+ value = item
+ for key in mapping.api_field_path.split('.'):
+ value = value.get(key) if isinstance(value, dict)
else None
+ if value is None:
+ break
+
+ # Convert value to target type
+ if value is not None:
+ if mapping.db_column_type == 'JSONB':
+ row_data[mapping.db_column_name] =
json.dumps(value) if not isinstance(value, str) else value
+ elif mapping.db_column_type == 'INTEGER':
+ row_data[mapping.db_column_name] = int(value)
+ elif mapping.db_column_type == 'FLOAT':
+ row_data[mapping.db_column_name] = float(value)
+ elif mapping.db_column_type == 'BOOLEAN':
+ row_data[mapping.db_column_name] = bool(value)
+ else:
+ row_data[mapping.db_column_name] = str(value)
+
+ # Insert into table
+ if row_data:
+ # columns = ', '.join(row_data.keys()) + ', fetched_at'
+ # placeholders = ', '.join(['%s'] * len(row_data)) +
', NOW()'
+ # sql = f"INSERT INTO {config.target_table}
({columns}) VALUES ({placeholders})"
+
+ # db.session.execute(text(sql),
list(row_data.values()))
+ columns = ', '.join(row_data.keys()) + ', fetched_at'
+ placeholders = ', '.join([f":{key}" for key in
row_data.keys()]) + ', NOW()'
+ sql = f"INSERT INTO {config.target_table} ({columns})
VALUES ({placeholders})"
+ db.session.execute(text(sql), row_data)
+
+ records_inserted += 1
+
+ db.session.commit()
+
+ # Log success
+ log = ExecutionLog(
+ config_id=config.id,
+ status='success',
+ message=f'Successfully fetched and inserted
{records_inserted} records',
+ records_inserted=records_inserted
+ )
+ db.session.add(log)
+ db.session.commit()
+
+ logger.info(f"✓ Scheduled job executed successfully for
config: {config.name} ({records_inserted} records)")
+
+ # Extract values from API response
Review Comment:
**Suggestion:** In `fetch_api_data`, when the API response is a list, the
code correctly processes all items but then falls through to the single-object
handling logic, where it tries to call `.get` on the list, causing an
AttributeError and generating a spurious error log after a successful insert.
[logic error]
<details>
<summary><b>Severity Level:</b> Major ⚠️</summary>
```mdx
- ⚠️ Scheduled jobs with list responses always end in exceptions.
- ⚠️ Error logs conflict with earlier success logs per run.
- ⚠️ Monitoring and alerting receive misleading failure signals.
```
</details>
```suggestion
return
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Configure an API in the UI so that `APIConfiguration.api_url` returns a
JSON array
(e.g. `[{"id":1,...}, {"id":2,...}]`), and activate it via
`/api-scheduler/config/<id>/toggle` which calls `toggle_active()` (lines
327–337) and
`schedule_jobs()` (lines 610–639).
2. `schedule_jobs()` schedules `fetch_api_data` (lines 459–595) to run
periodically with
`args=[config.id]` (lines 620–628); when the job fires, the API is called
and `data =
response.json()` (lines 493–495) is a list.
3. The list branch at lines 498–554 runs successfully: it loops items,
builds `row_data`,
inserts rows with `db.session.execute(text(sql), row_data)` (lines 535–538),
commits, and
logs a success `ExecutionLog` entry and info message.
4. After finishing the `if isinstance(data, list):` block, execution falls
through to the
non-list logic at lines 556–575 where `value = data` and `value =
value.get(key)` are
executed, but `data` is still a list, so `value.get` raises `AttributeError:
'list' object
has no attribute 'get'`, causing the outer `except` (lines 598–608) to log a
spurious
error despite the earlier successful insert.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/views/api_scheduler/views.py
**Line:** 555:555
**Comment:**
*Logic Error: In `fetch_api_data`, when the API response is a list, the
code correctly processes all items but then falls through to the single-object
handling logic, where it tries to call `.get` on the list, causing an
AttributeError and generating a spurious error log after a successful insert.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=0b2de97573ec96bc323a7f1607f8d7b10becabc7c231a968bccc0e03185f19af&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=0b2de97573ec96bc323a7f1607f8d7b10becabc7c231a968bccc0e03185f19af&reaction=dislike'>👎</a>
##########
superset/views/api_scheduler/views.py:
##########
@@ -0,0 +1,645 @@
+# superset/views/api_scheduler/views.py
+"""
+API Scheduler Views for Apache Superset
+Provides routes for scheduled API data fetching management
+"""
+
+from flask import Blueprint, render_template, request, redirect, url_for,
flash, jsonify, abort
+from superset import db
+from .models import APIConfiguration, FieldMapping, ExecutionLog,
ALLOWED_COLUMN_TYPES
+import requests
+import json
+import re
+from datetime import datetime
+from sqlalchemy import text
+import ipaddress
+
+# Blueprint oluştur (login gerektirmeyen public routes)
+api_scheduler_bp = Blueprint(
+ 'api_scheduler',
+ __name__,
+ url_prefix='/api-scheduler',
+ template_folder='../../templates/api_scheduler',
+ static_folder='../../static/assets/api_scheduler'
+)
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+import pytz
+import logging
+logger = logging.getLogger(__name__)
+# Global scheduler instance
+scheduler = BackgroundScheduler(timezone=pytz.UTC)
+scheduler_started = False
+_app_instance = None # EKLE: Global app instance
+@api_scheduler_bp.record_once
+def on_load(state):
+ """Blueprint yüklendiğinde app instance'ını sakla"""
+ global _app_instance
+ _app_instance = state.app
+ logger.info("API Scheduler blueprint loaded, app instance captured")
+
+# ============== DASHBOARD ==============
+@api_scheduler_bp.route('/')
+@api_scheduler_bp.route('/dashboard')
+def dashboard():
+ """Ana dashboard - Tüm konfigürasyonları listele"""
+ configs = db.session.query(APIConfiguration).all()
+
+ # Her config için son execution ve next run bilgisini al
+ for config in configs:
+ # Last execution
+ last_log = db.session.query(ExecutionLog)\
+ .filter_by(config_id=config.id)\
+ .order_by(ExecutionLog.executed_at.desc())\
+ .first()
+ config.last_execution = last_log
+
+ # Next run time from scheduler
+ config.next_run_time = None
+ if config.is_active:
+ job = scheduler.get_job(f'api_fetch_{config.id}')
+ if job:
+ config.next_run_time = job.next_run_time
+
+ return render_template('api_scheduler/dashboard.html', configs=configs)
+
+
+# ============== CONFIG CRUD ==============
+@api_scheduler_bp.route('/config/new', methods=['GET', 'POST'])
+def new_config():
+ """Yeni API configuration oluştur"""
+ if request.method == 'POST':
+ try:
+ # Validate SSRF
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html')
+
+ # Parse headers
+ headers_str = request.form.get('api_headers', '').strip()
+ if headers_str and headers_str != 'None':
+ try:
+ json.loads(headers_str)
+ except json.JSONDecodeError:
+ flash('Invalid JSON format for headers', 'danger')
+ return render_template('api_scheduler/config_form.html')
+ else:
+ headers_str = None
+
+ config = APIConfiguration(
+ name=request.form['name'],
+ target_table='api_scheduler_' + request.form['target_table'],
+ api_url=api_url,
+ api_method=request.form.get('api_method', 'GET'),
+ api_headers=headers_str,
+ api_key=request.form.get('api_key'),
+ schedule_interval=int(request.form.get('schedule_interval',
300)),
+ is_active=False
+ )
+
+ db.session.add(config)
+ db.session.commit()
+
+ flash(f'Configuration "{config.name}" created successfully!',
'success')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config.id))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error creating configuration: {str(e)}', 'danger')
+
+ return render_template('api_scheduler/config_form.html',config=None)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/edit', methods=['GET',
'POST'])
+def edit_config(config_id):
+ """Config düzenle"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ if request.method == 'POST':
+ try:
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html',
config=config)
+
+ config.name = request.form['name']
+ config.target_table = 'api_scheduler_' +
request.form['target_table'].replace('api_scheduler_', '')
Review Comment:
**Suggestion:** When editing a configuration, the updated `target_table` is
again taken directly from user input and concatenated into SQL, so a malicious
table name can still inject arbitrary SQL into future DROP/INSERT statements.
[security]
<details>
<summary><b>Severity Level:</b> Critical 🚨</summary>
```mdx
- ❌ Malicious edits persist dangerous SQL in configuration.
- ❌ Subsequent scheduled runs execute attacker-controlled SQL.
- ⚠️ Database schema and data may be corrupted.
```
</details>
```suggestion
raw_table = request.form['target_table']
safe_table = re.sub(r'[^a-zA-Z0-9_]', '',
raw_table.replace('api_scheduler_', ''))
if not safe_table:
raise ValueError('Invalid target table name')
config.name = request.form['name']
config.target_table = 'api_scheduler_' + safe_table
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Start Superset with this PR code so the `api_scheduler` blueprint is
active (lines
17–24).
2. Create any initial configuration, then navigate to
`/api-scheduler/config/<config_id>/edit` which is handled by `edit_config()`
(lines
114–144).
3. Submit a POST to `/api-scheduler/config/<config_id>/edit` with
`target_table`
containing malicious SQL, such as `safe_name"; DROP TABLE public.slices;--`.
4. At lines 128–130, `edit_config()` writes `config.target_table =
'api_scheduler_' +
request.form['target_table'].replace('api_scheduler_', '')` without
sanitization,
persisting the unsafe value; later, when `delete_config()` (lines 147–170) or
`fetch_api_data()` (lines 459–595) constructs `DROP TABLE IF EXISTS
{config.target_table}`
or `INSERT INTO {config.target_table} ...`, the embedded SQL in the table
name is
executed.
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/views/api_scheduler/views.py
**Line:** 128:129
**Comment:**
*Security: When editing a configuration, the updated `target_table` is
again taken directly from user input and concatenated into SQL, so a malicious
table name can still inject arbitrary SQL into future DROP/INSERT statements.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=853a58b391cfd2df339aadb2cc0666e41be45ca86c43250d80a60e45c98d0a1c&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=853a58b391cfd2df339aadb2cc0666e41be45ca86c43250d80a60e45c98d0a1c&reaction=dislike'>👎</a>
##########
superset/views/api_scheduler/views.py:
##########
@@ -0,0 +1,645 @@
+# superset/views/api_scheduler/views.py
+"""
+API Scheduler Views for Apache Superset
+Provides routes for scheduled API data fetching management
+"""
+
+from flask import Blueprint, render_template, request, redirect, url_for,
flash, jsonify, abort
+from superset import db
+from .models import APIConfiguration, FieldMapping, ExecutionLog,
ALLOWED_COLUMN_TYPES
+import requests
+import json
+import re
+from datetime import datetime
+from sqlalchemy import text
+import ipaddress
+
+# Blueprint oluştur (login gerektirmeyen public routes)
+api_scheduler_bp = Blueprint(
+ 'api_scheduler',
+ __name__,
+ url_prefix='/api-scheduler',
+ template_folder='../../templates/api_scheduler',
+ static_folder='../../static/assets/api_scheduler'
+)
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+import pytz
+import logging
+logger = logging.getLogger(__name__)
+# Global scheduler instance
+scheduler = BackgroundScheduler(timezone=pytz.UTC)
+scheduler_started = False
+_app_instance = None # EKLE: Global app instance
+@api_scheduler_bp.record_once
+def on_load(state):
+ """Blueprint yüklendiğinde app instance'ını sakla"""
+ global _app_instance
+ _app_instance = state.app
+ logger.info("API Scheduler blueprint loaded, app instance captured")
+
+# ============== DASHBOARD ==============
+@api_scheduler_bp.route('/')
+@api_scheduler_bp.route('/dashboard')
+def dashboard():
+ """Ana dashboard - Tüm konfigürasyonları listele"""
+ configs = db.session.query(APIConfiguration).all()
+
+ # Her config için son execution ve next run bilgisini al
+ for config in configs:
+ # Last execution
+ last_log = db.session.query(ExecutionLog)\
+ .filter_by(config_id=config.id)\
+ .order_by(ExecutionLog.executed_at.desc())\
+ .first()
+ config.last_execution = last_log
+
+ # Next run time from scheduler
+ config.next_run_time = None
+ if config.is_active:
+ job = scheduler.get_job(f'api_fetch_{config.id}')
+ if job:
+ config.next_run_time = job.next_run_time
+
+ return render_template('api_scheduler/dashboard.html', configs=configs)
+
+
+# ============== CONFIG CRUD ==============
+@api_scheduler_bp.route('/config/new', methods=['GET', 'POST'])
+def new_config():
+ """Yeni API configuration oluştur"""
+ if request.method == 'POST':
+ try:
+ # Validate SSRF
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html')
+
+ # Parse headers
+ headers_str = request.form.get('api_headers', '').strip()
+ if headers_str and headers_str != 'None':
+ try:
+ json.loads(headers_str)
+ except json.JSONDecodeError:
+ flash('Invalid JSON format for headers', 'danger')
+ return render_template('api_scheduler/config_form.html')
+ else:
+ headers_str = None
+
+ config = APIConfiguration(
+ name=request.form['name'],
+ target_table='api_scheduler_' + request.form['target_table'],
+ api_url=api_url,
+ api_method=request.form.get('api_method', 'GET'),
+ api_headers=headers_str,
+ api_key=request.form.get('api_key'),
+ schedule_interval=int(request.form.get('schedule_interval',
300)),
+ is_active=False
+ )
+
+ db.session.add(config)
+ db.session.commit()
+
+ flash(f'Configuration "{config.name}" created successfully!',
'success')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config.id))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error creating configuration: {str(e)}', 'danger')
+
+ return render_template('api_scheduler/config_form.html',config=None)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/edit', methods=['GET',
'POST'])
+def edit_config(config_id):
+ """Config düzenle"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ if request.method == 'POST':
+ try:
+ api_url = request.form.get('api_url', '').strip()
+ if not validate_url_safe(api_url):
+ flash('Invalid URL or private IP address detected', 'danger')
+ return render_template('api_scheduler/config_form.html',
config=config)
+
+ config.name = request.form['name']
+ config.target_table = 'api_scheduler_' +
request.form['target_table'].replace('api_scheduler_', '')
+ config.api_url = api_url
+ config.api_method = request.form.get('api_method', 'GET')
+ config.api_headers = request.form.get('api_headers')
+ config.api_key = request.form.get('api_key')
+ config.schedule_interval =
int(request.form.get('schedule_interval', 300))
+
+ db.session.commit()
+ flash('Configuration updated successfully!', 'success')
+ return redirect(url_for('api_scheduler.dashboard'))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error updating configuration: {str(e)}', 'danger')
+
+ return render_template('api_scheduler/config_form.html', config=config)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/delete', methods=['POST'])
+def delete_config(config_id):
+ """Config sil"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ try:
+ # Drop table if exists
+ try:
+ db.session.execute(text(f'DROP TABLE IF EXISTS
{config.target_table}'))
+ db.session.commit()
+ except:
+ pass
+
+ db.session.delete(config)
+ db.session.commit()
+ flash(f'Configuration "{config.name}" deleted successfully!',
'success')
+ schedule_jobs()
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error deleting configuration: {str(e)}', 'danger')
+
+ return redirect(url_for('api_scheduler.dashboard'))
+
+
+# ============== FIELD MAPPINGS ==============
+@api_scheduler_bp.route('/config/<int:config_id>/mappings', methods=['GET',
'POST'])
+def edit_mappings(config_id):
+ """Field mapping'leri düzenle"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ if request.method == 'POST':
+ try:
+ # Önceki mapping'leri sil
+ old_mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+ for mapping in old_mappings:
+ db.session.delete(mapping)
+ # Array'leri al
+ api_field_paths = request.form.getlist('api_field_path[]')
+ db_column_names = request.form.getlist('db_column_name[]')
+ db_column_types = request.form.getlist('db_column_type[]')
+
+ # Her mapping için kaydet
+ for i in range(len(api_field_paths)):
+ mapping = FieldMapping(
+ config_id=config_id,
+ api_field_path=api_field_paths[i],
+ db_column_name=db_column_names[i].lower(),
+ db_column_type=db_column_types[i]
+ )
+ db.session.add(mapping)
+
+ db.session.commit()
+ flash('Field mappings saved!', 'success')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error adding mapping: {str(e)}', 'danger')
+
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+ return render_template('api_scheduler/mappings.html',
+ config=config,
+ mappings=mappings,
+ column_types=ALLOWED_COLUMN_TYPES.keys())
+
+
+@api_scheduler_bp.route('/mapping/<int:mapping_id>/delete', methods=['POST'])
+def delete_mapping(mapping_id):
+ """Mapping sil"""
+ mapping = db.session.query(FieldMapping).get(mapping_id)
+ if not mapping:
+ abort(404)
+
+ config_id = mapping.config_id
+ db.session.delete(mapping)
+ db.session.commit()
+
+ flash('Field mapping deleted!', 'success')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config_id))
+
+
+# ============== API TEST ==============
+@api_scheduler_bp.route('/config/<int:config_id>/test', methods=['POST'])
+def test_api(config_id):
+ """API'yi test et ve response döndür"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ return jsonify({'success': False, 'error': 'Configuration not
found'}), 404
+
+ try:
+ headers = {}
+ if config.api_headers:
+ try:
+ headers = json.loads(config.api_headers)
+ except:
+ pass
+
+ if config.api_key:
+ headers['Authorization'] = f'Bearer {config.api_key}'
+
+ response = requests.request(
+ method=config.api_method,
+ url=config.api_url,
+ headers=headers,
+ timeout=30
+ )
+
+ response.raise_for_status()
+ data = response.json()
+
+ return jsonify({
+ 'success': True,
+ 'data': data,
+ 'status_code': response.status_code
+ })
+
+ except requests.exceptions.JSONDecodeError as e:
+ return jsonify({
+ 'success': False,
+ 'error': f'Invalid JSON response: {str(e)}',
+ 'raw_response': response.text[:500] if 'response' in locals() else
'No response'
+ }), 400
+ except Exception as e:
+ return jsonify({'success': False, 'error': str(e)}), 400
+
+
+# ============== SQL PREVIEW & EXECUTE ==============
+@api_scheduler_bp.route('/config/<int:config_id>/sql-preview')
+def sql_preview(config_id):
+ """CREATE TABLE SQL önizleme"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config_id).all()
+
+ if not mappings:
+ flash('Please add at least one field mapping first', 'warning')
+ return redirect(url_for('api_scheduler.edit_mappings',
config_id=config_id))
+
+ sql = generate_create_table_sql(config, mappings)
+ return render_template('api_scheduler/sql_preview.html', config=config,
sql=sql)
+
+
+@api_scheduler_bp.route('/config/<int:config_id>/execute-sql',
methods=['POST'])
+def execute_sql(config_id):
+ """SQL'i çalıştır ve tabloyu oluştur"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ custom_sql = request.form.get('custom_sql', '').strip()
+
+ # Validate SQL
+ if not validate_custom_sql(custom_sql, config.target_table):
+ flash('Invalid or dangerous SQL detected!', 'danger')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+ try:
+ # Drop if exists
+ db.session.execute(text(f'DROP TABLE IF EXISTS {config.target_table}'))
+
+ # Execute CREATE TABLE
+ db.session.execute(text(custom_sql))
+ db.session.commit()
+
+ flash(f'Table "{config.target_table}" created successfully!',
'success')
+ return redirect(url_for('api_scheduler.dashboard'))
+
+ except Exception as e:
+ db.session.rollback()
+ flash(f'Error creating table: {str(e)}', 'danger')
+ return redirect(url_for('api_scheduler.sql_preview',
config_id=config_id))
+
+
+# ============== TOGGLE ACTIVE ==============
+@api_scheduler_bp.route('/config/<int:config_id>/toggle', methods=['POST'])
+def toggle_active(config_id):
+ """Config'i aktif/pasif yap"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ return jsonify({'success': False, 'error': 'Not found'}), 404
+
+ config.is_active = not config.is_active
+ db.session.commit()
+ schedule_jobs()
+ return jsonify({'success': True, 'is_active': config.is_active})
+
+
+# ============== LOGS ==============
+@api_scheduler_bp.route('/config/<int:config_id>/logs')
+def view_logs(config_id):
+ """Execution loglarını göster"""
+ config = db.session.query(APIConfiguration).get(config_id)
+ if not config:
+ abort(404)
+
+ logs = db.session.query(ExecutionLog)\
+ .filter_by(config_id=config_id)\
+ .order_by(ExecutionLog.executed_at.desc())\
+ .limit(50)\
+ .all()
+
+ return render_template('api_scheduler/logs.html', config=config, logs=logs)
+
+
+# ============== HELPER FUNCTIONS ==============
+def validate_url_safe(url):
+ """SSRF koruması - private IP'leri engelle"""
+ try:
+ from urllib.parse import urlparse
+ parsed = urlparse(url)
+ hostname = parsed.hostname
+
+ if not hostname:
+ return False
+
+ # IP adresini resolve et
+ import socket
+ ip = socket.gethostbyname(hostname)
+ ip_obj = ipaddress.ip_address(ip)
+
+ # Private IP'leri engelle
+ if ip_obj.is_private or ip_obj.is_loopback:
+ return False
+
+ return True
+ except:
+ return False
+
+
+def generate_create_table_sql(config, mappings):
+ """CREATE TABLE SQL oluştur"""
+ columns = ['id SERIAL PRIMARY KEY', 'fetched_at TIMESTAMP DEFAULT
CURRENT_TIMESTAMP']
+
+ for mapping in mappings:
+ col_def = f"{mapping.db_column_name} {mapping.db_column_type}"
+ columns.append(col_def)
+
+ sql = f"CREATE TABLE {config.target_table} (\n"
+ sql += ",\n".join(f" {col}" for col in columns)
+ sql += "\n);"
+
+ return sql
+
+
+def validate_custom_sql(sql, expected_table):
+ """SQL güvenlik validasyonu"""
+ sql_upper = sql.upper()
+
+ # Sadece CREATE TABLE izin ver
+ if not sql_upper.startswith('CREATE TABLE'):
+ return False
+
+ # Tehlikeli keyword'leri engelle
+ dangerous = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'GRANT', 'REVOKE',
';--', '/*', '*/']
+ for keyword in dangerous:
+ if keyword in sql_upper.replace('CREATE TABLE', ''):
+ return False
+
+ # Table ismini kontrol et
+ if expected_table.upper() not in sql_upper:
+ return False
+
+ # ID ve FETCHED_AT zorunlu
+ if not re.search(r'\bID\b.*\bSERIAL\b', sql_upper):
+ return False
+ if not re.search(r'\bFETCHED_AT\b.*\bTIMESTAMP\b', sql_upper):
+ return False
+
+ return True
+
+
+# ============== LOG MANAGEMENT ==============
+@api_scheduler_bp.route('/logs/<int:log_id>/delete', methods=['POST'])
+def delete_log(log_id):
+ """Delete a single log entry"""
+ try:
+ log = db.session.get(ExecutionLog, log_id)
+ if not log:
+ return jsonify({'success': False, 'error': 'Log not found'}), 404
+
+ db.session.delete(log)
+ db.session.commit()
+
+ return jsonify({'success': True})
+ except Exception as e:
+ db.session.rollback()
+ return jsonify({'success': False, 'error': str(e)}), 400
+@api_scheduler_bp.route('/logs/clear-all', methods=['POST'])
+def clear_all_logs():
+ """Delete all execution logs"""
+ try:
+ logs_to_delete = db.session.query(ExecutionLog).all()
+ deleted_count = len(logs_to_delete)
+ for log in logs_to_delete:
+ db.session.delete(log)
+
+ db.session.commit()
+
+ return jsonify({'success': True, 'deleted': deleted_count})
+ except Exception as e:
+ db.session.rollback()
+ return jsonify({'success': False, 'error': str(e)}), 400
+
+# ==================== SCHEDULER SETUP ====================
+
+
+def fetch_api_data(config_id):
+ """Fetch data from API and insert into table (with app context)"""
+ global _app_instance
+
+ if not _app_instance:
+ logger.error("App instance not available!")
+ return
+
+ with _app_instance.app_context():
+ config = db.session.get(APIConfiguration, config_id)
+ if not config or not config.is_active:
+ logger.info(f"Config {config_id} not active or not found,
skipping")
+ return
+
+ try:
+ logger.info(f"Starting scheduled fetch for config: {config.name}")
+
+ # Prepare headers
+ headers = {}
+ if config.api_headers:
+ try:
+ headers = json.loads(config.api_headers)
+ except:
+ pass
+
+ if config.api_key:
+ headers['Authorization'] = f'Bearer {config.api_key}'
+
+ # Fetch API data
+ response = requests.request(
+ method=config.api_method,
+ url=config.api_url,
+ headers=headers,
+ timeout=30
+ )
+ response.raise_for_status()
+ data = response.json()
+
+ # Get field mappings
+ mappings =
db.session.query(FieldMapping).filter_by(config_id=config.id).all()
+ if not mappings:
+ raise Exception("No field mappings configured")
+ # ARRAY KONTROLÜ EKLE!
+ if isinstance(data, list):
+ # Array ise her eleman için işle
+ records_inserted = 0
+ for item in data:
+ row_data = {}
+ for mapping in mappings:
+ value = item
+ for key in mapping.api_field_path.split('.'):
+ value = value.get(key) if isinstance(value, dict)
else None
+ if value is None:
+ break
+
+ # Convert value to target type
+ if value is not None:
+ if mapping.db_column_type == 'JSONB':
+ row_data[mapping.db_column_name] =
json.dumps(value) if not isinstance(value, str) else value
+ elif mapping.db_column_type == 'INTEGER':
+ row_data[mapping.db_column_name] = int(value)
+ elif mapping.db_column_type == 'FLOAT':
+ row_data[mapping.db_column_name] = float(value)
+ elif mapping.db_column_type == 'BOOLEAN':
+ row_data[mapping.db_column_name] = bool(value)
+ else:
+ row_data[mapping.db_column_name] = str(value)
+
+ # Insert into table
+ if row_data:
+ # columns = ', '.join(row_data.keys()) + ', fetched_at'
+ # placeholders = ', '.join(['%s'] * len(row_data)) +
', NOW()'
+ # sql = f"INSERT INTO {config.target_table}
({columns}) VALUES ({placeholders})"
+
+ # db.session.execute(text(sql),
list(row_data.values()))
+ columns = ', '.join(row_data.keys()) + ', fetched_at'
+ placeholders = ', '.join([f":{key}" for key in
row_data.keys()]) + ', NOW()'
+ sql = f"INSERT INTO {config.target_table} ({columns})
VALUES ({placeholders})"
+ db.session.execute(text(sql), row_data)
+
+ records_inserted += 1
+
+ db.session.commit()
+
+ # Log success
+ log = ExecutionLog(
+ config_id=config.id,
+ status='success',
+ message=f'Successfully fetched and inserted
{records_inserted} records',
+ records_inserted=records_inserted
+ )
+ db.session.add(log)
+ db.session.commit()
+
+ logger.info(f"✓ Scheduled job executed successfully for
config: {config.name} ({records_inserted} records)")
+
+ # Extract values from API response
+ row_data = {}
+ for mapping in mappings:
+ value = data
+ for key in mapping.api_field_path.split('.'):
+ value = value.get(key) if isinstance(value, dict) else None
+ if value is None:
+ break
+
+ # Convert value to target type
+ if value is not None:
+ if mapping.db_column_type == 'JSONB':
+ row_data[mapping.db_column_name] = json.dumps(value)
if not isinstance(value, str) else value
+ elif mapping.db_column_type == 'INTEGER':
+ row_data[mapping.db_column_name] = int(value)
+ elif mapping.db_column_type == 'FLOAT':
+ row_data[mapping.db_column_name] = float(value)
+ elif mapping.db_column_type == 'BOOLEAN':
+ row_data[mapping.db_column_name] = bool(value)
+ else:
+ row_data[mapping.db_column_name] = str(value)
+
+ # Insert into table
+ if row_data:
+ columns = ', '.join(row_data.keys()) + ', fetched_at'
+ placeholders = ', '.join(['%s'] * len(row_data)) + ', NOW()'
+ sql = f"INSERT INTO {config.target_table} ({columns}) VALUES
({placeholders})"
+
+ db.session.execute(text(sql), list(row_data.values()))
+ db.session.commit()
+
+ # Log success
+ log = ExecutionLog(
+ config_id=config.id,
+ status='success',
+ message=f'Successfully fetched and inserted {len(row_data)}
fields',
+ records_inserted=1
+ )
+ db.session.add(log)
+ db.session.commit()
+
+ logger.info(f"✓ Scheduled job executed successfully for config:
{config.name}")
+
+ except Exception as e:
+ # Log error
+ error_msg = str(e)
+ log = ExecutionLog(
Review Comment:
**Suggestion:** In the `fetch_api_data` error handler, any database error
leaves the session in a failed transaction state, and because no rollback is
performed before logging the error, the attempt to insert an `ExecutionLog`
entry can itself fail and block subsequent scheduler runs. [logic error]
<details>
<summary><b>Severity Level:</b> Critical 🚨</summary>
```mdx
- ❌ Database errors leave scheduler session in broken state.
- ❌ Error `ExecutionLog` entries fail to persist after failures.
- ⚠️ Subsequent jobs may repeatedly error without clear diagnostics.
```
</details>
```suggestion
db.session.rollback()
```
<details>
<summary><b>Steps of Reproduction ✅ </b></summary>
```mdx
1. Configure an API and mappings such that `fetch_api_data()` (lines
459–595) constructs
invalid SQL or otherwise triggers a database error during an insert, for
example due to a
type mismatch or bad column name in `generate_create_table_sql()`/custom SQL
used earlier.
2. When the scheduled job executes via `schedule_jobs()` (lines 610–639),
the invalid
insert causes `db.session.execute(...)` or `db.session.commit()` in
`fetch_api_data()` to
raise an exception, leaving the SQLAlchemy session in a "failed transaction"
state.
3. Control flows into the `except Exception as e:` block at lines 598–608,
where
`db.session.add(log)` and `db.session.commit()` are called without an
intervening
`db.session.rollback()`.
4. SQLAlchemy raises a `PendingRollbackError` (or similar) when commit is
attempted on the
failed session, so the `ExecutionLog` error record is not stored, and the
scoped session
associated with the Flask app remains unusable for subsequent scheduler runs
until an
explicit rollback is performed elsewhere (which this blueprint never does).
```
</details>
<details>
<summary><b>Prompt for AI Agent 🤖 </b></summary>
```mdx
This is a comment left during a code review.
**Path:** superset/views/api_scheduler/views.py
**Line:** 601:601
**Comment:**
*Logic Error: In the `fetch_api_data` error handler, any database error
leaves the session in a failed transaction state, and because no rollback is
performed before logging the error, the attempt to insert an `ExecutionLog`
entry can itself fail and block subsequent scheduler runs.
Validate the correctness of the flagged issue. If correct, How can I resolve
this? If you propose a fix, implement it and please make it concise.
```
</details>
<a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=039f2c9da603b90b09c3bdf1de5979ac233dfa787a93cef6af0abe012942d2df&reaction=like'>👍</a>
| <a
href='https://app.codeant.ai/feedback?pr_url=https%3A%2F%2Fgithub.com%2Fapache%2Fsuperset%2Fpull%2F38018&comment_hash=039f2c9da603b90b09c3bdf1de5979ac233dfa787a93cef6af0abe012942d2df&reaction=dislike'>👎</a>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]