GitHub user kch8306 edited a discussion: Preventing Duplicate Queries During
Asynchronous Chart Data Processing
Hello!
We are using Superset 3.1 with Redis as our cache database. To handle
long-running queries, we have enabled GLOBAL_ASYNC_QUERIES. However, we’ve
noticed an issue: when a dashboard is loaded and the charts are not yet cached,
if multiple users access the same dashboard simultaneously, the same queries
are sent repeatedly to the database. This is inefficient and increases the load.
To address this, I’m considering modifying the "load_chart_data_into_cache"
function in "/var/local/idk2/heartset/superset/tasks/async_queries.py"
(proposed changes below). I’d like to hear feedback from the community before
proceeding.
If you have a specific code diff or further context you want included, let me
know and I can update the body!
```
@celery_app.task(name="load_chart_data_into_cache",
soft_time_limit=query_timeout)
def load_chart_data_into_cache(
job_metadata: dict[str, Any],
form_data: dict[str, Any],
) -> None:
# pylint: disable=import-outside-toplevel
from superset.commands.chart.data.get_data_command import ChartDataCommand
with override_user(_load_user_from_job_metadata(job_metadata), force=False):
try:
set_form_data(form_data)
query_context = _create_query_context_from_form(form_data)
t_cache_key =
query_context._processor.cache_key().removeprefix("qc-")
session = db.create_scoped_session()
try:
cache_entry =
session.query(CacheKey).filter_by(cache_key=t_cache_key).first()
if cache_entry is None:
new_cache_entry = CacheKey(cache_key=t_cache_key,
datasource_uid=1)
session.add(new_cache_entry)
session.commit()
else:
session.close()
while True:
check_session = db.create_scoped_session()
try:
cache_entry_cnt =
check_session.query(CacheKey).filter_by(cache_key=t_cache_key).count()
if cache_entry_cnt == 0:
break
finally:
check_session.close()
logger.warning("=================wait=================:%s", t_cache_key)
time.sleep(5)
except Exception as e:
session.rollback()
logger.error("Error in cache entry creation: %s", str(e))
raise e
finally:
session.close()
command = ChartDataCommand(query_context)
result = command.run(cache=True)
cache_key = result["cache_key"]
result_url = f"/api/v1/chart/data/{cache_key}"
delete_session = db.create_scoped_session()
try:
cache_entry =
delete_session.query(CacheKey).filter_by(cache_key=t_cache_key).first()
if cache_entry:
delete_session.delete(cache_entry)
delete_session.commit()
except Exception as e:
delete_session.rollback()
logger.error("Error in cache entry deletion: %s", str(e))
raise e
finally:
delete_session.close()
async_query_manager.update_job(
job_metadata,
async_query_manager.STATUS_DONE,
result_url=result_url,
)
except SoftTimeLimitExceeded as ex:
logger.warning("A timeout occurred while loading chart data, error:
%s", ex)
raise ex
except Exception as ex:
# TODO: QueryContext should support SIP-40 style errors
error = str(ex.message if hasattr(ex, "message") else ex)
errors = [{"message": error}]
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
raise ex
```
GitHub link: https://github.com/apache/superset/discussions/34316
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]