timifasubaa closed pull request #5852: [WIP] refactor sqllab run_query
URL: https://github.com/apache/incubator-superset/pull/5852
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/superset/assets/src/SqlLab/actions.js
b/superset/assets/src/SqlLab/actions.js
index a808949c2f..7aada0d2ce 100644
--- a/superset/assets/src/SqlLab/actions.js
+++ b/superset/assets/src/SqlLab/actions.js
@@ -87,8 +87,9 @@ export function startQuery(query) {
return { type: START_QUERY, query };
}
-export function querySuccess(query, results) {
- return { type: QUERY_SUCCESS, query, results };
+export function querySuccess(query, results, csv=false) {
+ console.log("this is the csv case and I am updating the redux store!");
+ return { csv: csv, type: QUERY_SUCCESS, query, results };
}
export function queryFailed(query, msg, link) {
@@ -119,10 +120,12 @@ function getErrorLink(err) {
return link;
}
-export function fetchQueryResults(query) {
+export function fetchQueryResults(query, csv=false) {
+ console.log("I get into fetchquery results");
return function (dispatch) {
dispatch(requestQueryResults(query));
- const sqlJsonUrl = `/superset/results/${query.resultsKey}/`;
+ console.log(query)
+ const sqlJsonUrl = `/superset/results/${query.resultsKey}/`;//
`/superset/csv/${query.client_id}/`;
$.ajax({
type: 'GET',
dataType: 'text',
@@ -130,6 +133,101 @@ export function fetchQueryResults(query) {
success(results) {
const parsedResults = JSONbig.parse(results);
dispatch(querySuccess(query, parsedResults));
+ console.log("I get to the next line")
+ if (csv){
+ console.log("I get inside the zone!")
+ dispatch(fetchCSVResults(query));
+ }
+ },
+ error(err) {
+ let msg = t('Failed at retrieving results from the results backend');
+ if (err.responseJSON && err.responseJSON.error) {
+ msg = err.responseJSON.error;
+ }
+ dispatch(queryFailed(query, msg, getErrorLink(err)));
+ },
+ });
+ };
+}
+
+//rerunQueryforCSVExport(this.props.query);
+export function rerunQueryforCSVExport(query) {
+ console.log('I am in the export csv runQuery function');
+ return function (dispatch) {
+ dispatch(startQuery(query));
+ const sqlJsonRequest = {
+ client_id: query.id,
+ database_id: query.dbId,
+ json: true,
+ runAsync: query.runAsync,
+ schema: query.schema,
+ sql: query.sql,
+ sql_editor_id: query.sqlEditorId,
+ tab: query.tab,
+ tmp_table_name: query.tempTableName,
+ select_as_cta: query.ctas,
+ templateParams: query.templateParams,
+ };
+ const sqlJsonUrl = '/superset/csv_export_rerun/' + window.location.search;
+ $.ajax({
+ type: 'POST',
+ dataType: 'json',
+ url: sqlJsonUrl,
+ data: sqlJsonRequest,
+ success(results) {
+ if (!query.runAsync) {
+ //****
+ dispatch(querySuccess(query, results, true));
+ }
+ },
+ error(err, textStatus, errorThrown) {
+ let msg;
+ try {
+ msg = err.responseJSON.error;
+ } catch (e) {
+ if (err.responseText !== undefined) {
+ msg = err.responseText;
+ }
+ }
+ if (msg === null) {
+ if (errorThrown) {
+ msg = `[${textStatus}] ${errorThrown}`;
+ } else {
+ msg = t('Unknown error');
+ }
+ }
+ if (msg.indexOf('CSRF token') > 0) {
+ msg = COMMON_ERR_MESSAGES.SESSION_TIMED_OUT;
+ }
+ dispatch(queryFailed(query, msg, getErrorLink(err)));
+ },
+ });
+ };
+}
+
+export function fetchCSVResults(query) {
+ return function (dispatch) {
+ dispatch(requestQueryResults(query));
+ console.log(query)
+ console.log('hereis the info at time of calling csv stuff')
+ const csvUrl = `/superset/csv/${query.id}`;
+ $.ajax({
+ type: 'GET',
+ dataType: 'text',
+ url: csvUrl,
+ success(results) {
+ //const parsedResults = JSONbig.parse(results);
+ //dispatch(querySuccess(query, parsedResults, csv=true));
+ var a = document.createElement('a');
+ console.log(results);
+ var binaryData = [];
+ binaryData.push(results);
+ var myblob = new Blob(binaryData, {type: "application/zip"})
+ var url = window.URL.createObjectURL(myblob)
+ a.href = url;
+ a.download = 'mycsv.csv';
+ a.click();
+ window.URL.revokeObjectURL(url);
},
error(err) {
let msg = t('Failed at retrieving results from the results backend');
@@ -142,7 +240,8 @@ export function fetchQueryResults(query) {
};
}
-export function runQuery(query) {
+export function runQuery(query, rerun=false) {
+ console.log('I am in the runQuery function');
return function (dispatch) {
dispatch(startQuery(query));
const sqlJsonRequest = {
@@ -157,8 +256,13 @@ export function runQuery(query) {
tmp_table_name: query.tempTableName,
select_as_cta: query.ctas,
templateParams: query.templateParams,
+ rerun: rerun,
};
const sqlJsonUrl = '/superset/sql_json/' + window.location.search;
+ //if (rerun) {
+ // console.log('here is the right move');
+ // dispatch(querySuccess(query, results))
+ //}
$.ajax({
type: 'POST',
dataType: 'json',
@@ -166,7 +270,8 @@ export function runQuery(query) {
data: sqlJsonRequest,
success(results) {
if (!query.runAsync) {
- dispatch(querySuccess(query, results));
+ //****
+ dispatch(querySuccess(query, results, true));
}
},
error(err, textStatus, errorThrown) {
diff --git a/superset/assets/src/SqlLab/components/ResultSet.jsx
b/superset/assets/src/SqlLab/components/ResultSet.jsx
index 6fa6a277d6..d4ba30a0ff 100644
--- a/superset/assets/src/SqlLab/components/ResultSet.jsx
+++ b/superset/assets/src/SqlLab/components/ResultSet.jsx
@@ -52,6 +52,7 @@ export default class ResultSet extends React.PureComponent {
// when new results comes in, save them locally and clear in store
if (this.props.cache && (!nextProps.query.cached)
&& nextProps.query.results
+ //check inside here if the query with the results contains csv
&& nextProps.query.results.data.length > 0) {
this.setState(
{ data: nextProps.query.results.data },
@@ -83,7 +84,9 @@ export default class ResultSet extends React.PureComponent {
this.setState({ searchText: event.target.value });
}
fetchResults(query) {
- this.props.actions.fetchQueryResults(query);
+ console.log("this is what I ger back from the database. I wish there could
be some info here about whether to render it as a csv.");
+ console.log(query)
+ this.props.actions.fetchQueryResults(query, true);
}
reFetchQueryResults(query) {
this.props.actions.reFetchQueryResults(query);
@@ -94,6 +97,14 @@ export default class ResultSet extends React.PureComponent {
this.props.actions.runQuery(query, true);
}
}
+ exportCsv(){
+ console.log("I call the right thing for now")
+ this.props.actions.rerunQueryforCSVExport(this.props.query);
+ // this.props.actions.fetchCSVResults(this.props.query);
+ // runQuery(this.props.query, true)
+ // action should make an API call to know if we need to rerun and if we
do,
+ // call rerun the right way and then download the csv!
+ }
renderControls() {
if (this.props.search || this.props.visualize || this.props.csv) {
return (
@@ -108,8 +119,8 @@ export default class ResultSet extends React.PureComponent {
actions={this.props.actions}
/>}
{this.props.csv &&
- <Button bsSize="small" href={'/superset/csv/' +
this.props.query.id}>
- <i className="fa fa-file-text-o" /> {t('.CSV')}
+ <Button onClick={this.exportCsv.bind(this)} bsSize="small">
+ <i className="fa fa-file-text-o" /> {t('Export to CSV')}
</Button>}
</ButtonGroup>
</div>
diff --git a/superset/assets/src/SqlLab/reducers.js
b/superset/assets/src/SqlLab/reducers.js
index 7fa60b1a4e..2c826c8984 100644
--- a/superset/assets/src/SqlLab/reducers.js
+++ b/superset/assets/src/SqlLab/reducers.js
@@ -149,6 +149,8 @@ export const sqlLabReducer = function (state = {}, action) {
return alterInObject(state, 'queries', action.query, { state: 'fetching'
});
},
[actions.QUERY_SUCCESS]() {
+ console.log(action);
+ console.log("jere are all the things in action ^^ ");
let rows;
if (action.results.data) {
rows = action.results.data.length;
diff --git a/superset/config.py b/superset/config.py
index a5e4f2988c..96b1f25859 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -276,7 +276,10 @@
MAPBOX_API_KEY = os.environ.get('MAPBOX_API_KEY', '')
# Maximum number of rows returned in the SQL editor
-SQL_MAX_ROW = 1000
+SQL_MAX_ROW = 5
+
+CSV_MAX_ROW = 10
+
# Maximum number of tables/views displayed in the dropdown window in SQL Lab.
MAX_TABLE_NAMES = 3000
diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py
index 176fbed04d..aade7dd90d 100644
--- a/superset/db_engine_specs.py
+++ b/superset/db_engine_specs.py
@@ -374,6 +374,7 @@ def execute(cls, cursor, query, **kwargs):
if cls.arraysize:
cursor.arraysize = cls.arraysize
cursor.execute(query)
+ #print()
@classmethod
def make_label_compatible(cls, label):
diff --git a/superset/sql_lab.py b/superset/sql_lab.py
index a2732d17c5..bf13c8ceb3 100644
--- a/superset/sql_lab.py
+++ b/superset/sql_lab.py
@@ -81,15 +81,16 @@ def session_scope(nullpool):
@celery_app.task(bind=True, soft_time_limit=SQLLAB_TIMEOUT)
def get_sql_results(
- ctask, query_id, rendered_query, return_results=True, store_results=False,
- user_name=None, start_time=None):
+ ctask, query_id, rendered_query, run_async=False,
+ user_name=None, start_time=None, for_csv=False):
"""Executes the sql query returns the results."""
+ print("rendered_query is {}".format(rendered_query))
+ print("for_csv is {}".format(for_csv))
with session_scope(not ctask.request.called_directly) as session:
-
try:
return execute_sql(
- ctask, query_id, rendered_query, return_results,
store_results, user_name,
- session=session, start_time=start_time)
+ ctask, query_id, rendered_query, run_async, user_name,
+ session=session, start_time=start_time, for_csv=for_csv)
except Exception as e:
logging.exception(e)
stats_logger.incr('error_sqllab_unhandled')
@@ -102,14 +103,18 @@ def get_sql_results(
def execute_sql(
- ctask, query_id, rendered_query, return_results=True, store_results=False,
- user_name=None, session=None, start_time=None,
+ ctask, query_id, rendered_query, run_async=False, store_results=False,
+ user_name=None, session=None, start_time=None, for_csv=False,
):
"""Executes the sql query returns the results."""
- if store_results and start_time:
+ if run_async and start_time:
# only asynchronous queries
stats_logger.timing(
'sqllab.query.time_pending', now_as_float() - start_time)
+
+ if run_async and not results_backend:
+ return handle_error("Results backend isn't configured.")
+
query = get_query(query_id, session)
payload = dict(query_id=query_id)
@@ -132,13 +137,12 @@ def handle_error(msg):
payload['link'] = troubleshooting_link
return payload
- if store_results and not results_backend:
- return handle_error("Results backend isn't configured.")
-
# Limit enforced only for retrieving the data, not for the CTA queries.
superset_query = SupersetQuery(rendered_query)
executed_sql = superset_query.stripped()
SQL_MAX_ROWS = app.config.get('SQL_MAX_ROW')
+ CSV_MAX_ROWS = app.config.get('CSV_MAX_ROW')
+
if not superset_query.is_readonly() and not database.allow_dml:
return handle_error(
'Only `SELECT` statements are allowed against this database')
@@ -153,10 +157,14 @@ def handle_error(msg):
query.user_id, start_dttm.strftime('%Y_%m_%d_%H_%M_%S'))
executed_sql = superset_query.as_create_table(query.tmp_table_name)
query.select_as_cta_used = True
- if (superset_query.is_select() and SQL_MAX_ROWS and
+
+ print(for_csv)
+ if ((not for_csv) and superset_query.is_select() and SQL_MAX_ROWS and
(not query.limit or query.limit > SQL_MAX_ROWS)):
+ print("I got into the part where we can change the value.")
query.limit = SQL_MAX_ROWS
executed_sql = database.apply_limit_to_sql(executed_sql, query.limit)
+ print(executed_sql)
# Hook to allow environment-specific mutation (usually comments) to the SQL
SQL_QUERY_MUTATOR = config.get('SQL_QUERY_MUTATOR')
@@ -235,22 +243,23 @@ def handle_error(msg):
'columns': cdf.columns if cdf.columns else [],
'query': query.to_dict(),
})
- if store_results:
- key = '{}'.format(uuid.uuid4())
- logging.info('Storing results in results backend, key: {}'.format(key))
- write_to_results_backend_start = now_as_float()
- json_payload = json.dumps(
- payload, default=json_iso_dttm_ser, ignore_nan=True)
- cache_timeout = database.cache_timeout
- if cache_timeout is None:
- cache_timeout = config.get('CACHE_DEFAULT_TIMEOUT', 0)
- results_backend.set(key, zlib_compress(json_payload), cache_timeout)
- query.results_key = key
- stats_logger.timing(
- 'sqllab.query.results_backend_write',
- now_as_float() - write_to_results_backend_start)
- session.merge(query)
- session.commit()
- if return_results:
+ if not run_async:
return payload
+
+ key = '{}'.format(uuid.uuid4())
+ logging.info('Storing results in results backend, key: {}'.format(key))
+ write_to_results_backend_start = now_as_float()
+ json_payload = json.dumps(
+ payload, default=json_iso_dttm_ser, ignore_nan=True)
+ cache_timeout = database.cache_timeout
+ if cache_timeout is None:
+ cache_timeout = config.get('CACHE_DEFAULT_TIMEOUT', 0)
+ results_backend.set(key, zlib_compress(json_payload), cache_timeout)
+ query.results_key = key
+ stats_logger.timing(
+ 'sqllab.query.results_backend_write',
+ now_as_float() - write_to_results_backend_start)
+
+ session.merge(query)
+ session.commit()
diff --git a/superset/views/core.py b/superset/views/core.py
index e021709434..529f2feea3 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -2376,13 +2376,17 @@ def stop_query(self):
@expose('/sql_json/', methods=['POST', 'GET'])
@log_this
def sql_json(self):
+ # make this function potentially just take update the state to
pending. If rerun is set to 1.
"""Runs arbitrary sql and returns and json"""
async_ = request.form.get('runAsync') == 'true'
sql = request.form.get('sql')
database_id = request.form.get('database_id')
schema = request.form.get('schema') or None
+ rerun = request.form.get('rerun')
template_params = json.loads(
request.form.get('templateParams') or '{}')
+ print("here are all the items in the request form")
+ print(request.form)
session = db.session()
mydb = session.query(models.Database).filter_by(id=database_id).first()
@@ -2408,28 +2412,42 @@ def sql_json(self):
)
client_id = request.form.get('client_id') or utils.shortid()
+ SQL_MAX_ROW = config.get('SQL_MAX_ROW')
- query = Query(
- database_id=int(database_id),
- limit=mydb.db_engine_spec.get_limit_from_sql(sql),
- sql=sql,
- schema=schema,
- select_as_cta=request.form.get('select_as_cta') == 'true',
- start_time=utils.now_as_float(),
- tab_name=request.form.get('tab'),
- status=QueryStatus.PENDING if async_ else QueryStatus.RUNNING,
- sql_editor_id=request.form.get('sql_editor_id'),
- tmp_table_name=tmp_table_name,
- user_id=g.user.get_id() if g.user else None,
- client_id=client_id,
- )
- session.add(query)
- session.flush()
- query_id = query.id
- session.commit() # shouldn't be necessary
- if not query_id:
- raise Exception(_('Query record was not created as expected.'))
- logging.info('Triggering query_id: {}'.format(query_id))
+ print("rerun is {}".format(rerun))
+ print(type(rerun))
+ if rerun == 'true':
+ client_id = request.form.get('client_id')
+ print("client_id is {}".format(client_id))
+ query = (
+ db.session.query(Query)
+ .filter_by(client_id=client_id)
+ .one())
+ #get the query by id. """
+ print("It is a rerun case!")
+ else:
+ # make a new one
+ query = Query(
+ database_id=int(database_id),
+ limit=mydb.db_engine_spec.get_limit_from_sql(sql),
+ sql=sql,
+ schema=schema,
+ select_as_cta=request.form.get('select_as_cta') == 'true',
+ start_time=utils.now_as_float(),
+ tab_name=request.form.get('tab'),
+ status=QueryStatus.PENDING if async_ else QueryStatus.RUNNING,
+ sql_editor_id=request.form.get('sql_editor_id'),
+ tmp_table_name=tmp_table_name,
+ user_id=g.user.get_id() if g.user else None,
+ client_id=client_id,
+ )
+ session.add(query)
+ session.flush()
+ query_id = query.id
+ session.commit() # shouldn't be necessary
+ if not query_id:
+ raise Exception(_('Query record was not created as expected.'))
+ logging.info('Triggering query_id: {}'.format(query_id))
try:
template_processor = get_template_processor(
@@ -2449,8 +2467,7 @@ def sql_json(self):
sql_lab.get_sql_results.delay(
query_id,
rendered_query,
- return_results=False,
- store_results=not query.select_as_cta,
+ run_async=True,
user_name=g.user.username,
start_time=utils.now_as_float())
except Exception as e:
@@ -2483,7 +2500,7 @@ def sql_json(self):
data = sql_lab.get_sql_results(
query_id,
rendered_query,
- return_results=True)
+ run_async=False)
payload = json.dumps(
data,
default=utils.pessimistic_json_iso_dttm_ser,
@@ -2497,18 +2514,87 @@ def sql_json(self):
return json_error_response(payload=data)
return json_success(payload)
+
+ @has_access_api
+ @expose('/csv_export_rerun/', methods=['POST', 'GET'])
+ @log_this
+ def csv_export_rerun(self):
+ # This endpoint will rerun a new query with the higher limit if
necessary. SHould be followed by polling for the query in question.
+ """Runs arbitrary sql and returns and json"""
+
+ #first make sure the query already exists.
+ #if sync, return the csv result like that, otherwise, return the
response with the query information.
+ client_id = request.form.get('client_id')
+
+ query = (
+ db.session.query(Query)
+ .filter_by(client_id=client_id)
+ .one()
+ )
+
+ SQL_MAX_ROW = config.get('SQL_MAX_ROW')
+ CSV_MAX_ROW = app.config.get('CSV_MAX_ROW')
+ # get the query limit specified in the query.
+ mydb =
db.session.query(models.Database).filter_by(id=query.database.id).first()
+ user_specified_limit =
mydb.db_engine_spec.get_limit_from_sql(query.sql)
+ #set the limit_used to the
+ print("vvvv")
+ print(user_specified_limit)
+ print(query.limit)
+ print(SQL_MAX_ROW)
+ print("^^^^")
+ if user_specified_limit > SQL_MAX_ROW:
+ #some change is definitely happening
+ query.limit = min(user_specified_limit, CSV_MAX_ROW)
+ print('rerunning the query!!!~~~')
+ query.executed_sql = query.database.apply_limit_to_sql(query.sql,
query.limit)
+ print(query.executed_sql)
+ sql_lab.get_sql_results.delay(query.id,
+ query.executed_sql,
+ run_async=True,
+ user_name=g.user.username,
+ start_time=utils.now_as_float(),
+ for_csv=True)
+ db.session.commit()
+ # return a json payload that tells it to start polling till it's
done.
+ # it should also contani a variable that tells it to call /csv
instead of results once it is done.
+ #return
+
+ query.csv=True
+
+
+
+ #rejected_tables = security_manager.rejected_datasources(sql, mydb,
schema)
+ #if rejected_tables:
+ # return json_error_response(
+ # security_manager.get_table_access_error_msg(rejected_tables),
+ # link=security_manager.get_table_access_link(rejected_tables),
+ # status=403)
+ #session.commit()
+
+
+ resp = json_success(json.dumps(
+ {'query': query.to_dict()}, default=utils.json_int_dttm_ser,
+ ignore_nan=True), status=202)
+ db.session.commit()
+ return resp
+
+
@has_access
@expose('/csv/<client_id>')
@log_this
def csv(self, client_id):
"""Download the query results as csv."""
logging.info('Exporting CSV file [{}]'.format(client_id))
+ # DO something where you are able to chheck the limit of the query if
it goes above the specified and adjust as necessary.
query = (
db.session.query(Query)
.filter_by(client_id=client_id)
.one()
)
+
+
rejected_tables = security_manager.rejected_datasources(
query.sql, query.database, query.schema)
if rejected_tables:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]