[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508083#comment-16508083 ] ASF GitHub Bot commented on FLINK-8861: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5660 Thanks for the improvements, @twalthr. > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Fix For: 1.6.0 > > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508072#comment-16508072 ] ASF GitHub Bot commented on FLINK-8861: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5660 > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > Fix For: 1.6.0 > > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508064#comment-16508064 ] ASF GitHub Bot commented on FLINK-8861: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5660 Thank you @xccui. I will test your code and merge this... > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507318#comment-16507318 ] ASF GitHub Bot commented on FLINK-8861: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5660 Hi @twalthr, the PR has been reworked. Take a look when you are convenient. Thanks! > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507317#comment-16507317 ] ASF GitHub Bot commented on FLINK-8861: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5660#discussion_r194250975 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java --- @@ -159,6 +160,9 @@ protected void evaluate(ResultTableOperation operation, String binding) { case PREV: gotoPreviousPage(); break; + case FIRST: --- End diff -- From the function's point of view, that's true. I added it just for symmetry . > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16479022#comment-16479022 ] ASF GitHub Bot commented on FLINK-8861: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5660 Sure @twalthr, I'll rebase the PR soon. > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478992#comment-16478992 ] ASF GitHub Bot commented on FLINK-8861: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5660#discussion_r188939606 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -293,59 +339,75 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw } @Override - public TypedResult snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); - if (result == null) { - throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); - } - if (!result.isMaterialized()) { - throw new SqlExecutionException("Invalid result retrieval mode."); + public TypedResult snapshotResult(SessionContext session, String resultId, int + pageSize) throws SqlExecutionException { + if (!resultStore.isStatic(resultId)) { + final DynamicResult result = resultStore.getDynamicResult(resultId); + if (result == null) { + throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); + } + if (!result.isMaterialized()) { + throw new SqlExecutionException("Invalid result retrieval mode."); + } + return ((MaterializedResult) result).snapshot(pageSize); + } else { + StaticResult staticResult = resultStore.getStaticResult(resultId); + return staticResult.snapshot(pageSize); } - return ((MaterializedResult) result).snapshot(pageSize); } @Override - public List retrieveResultPage(String resultId, int page) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); - if (result == null) { - throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); - } - if (!result.isMaterialized()) { - throw new SqlExecutionException("Invalid result retrieval mode."); + public List retrieveResultPage(String resultId, int page) throws + SqlExecutionException { + if (!resultStore.isStatic(resultId)) { + final DynamicResult result = resultStore.getDynamicResult(resultId); + if (result == null) { + throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); + } + if (!result.isMaterialized()) { + throw new SqlExecutionException("Invalid result retrieval mode."); + } + return ((MaterializedResult) result).retrievePage(page); + } else { + return resultStore.getStaticResult(resultId).retrievePage(page); } - return ((MaterializedResult) result).retrievePage(page); } @Override public void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); - if (result == null) { - throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); - } + if (!resultStore.isStatic(resultId)) { --- End diff -- Do we really need the distinction between dynamic and state result here? The executor should actually not matter. It should just kill whatever Flink job is running. > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478991#comment-16478991 ] ASF GitHub Bot commented on FLINK-8861: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5660#discussion_r188938891 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -233,56 +235,100 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw // deployment final ClusterClient clusterClient = createDeployment(mergedEnv.getDeployment()); - // initialize result - final DynamicResult result = resultStore.createResult( - mergedEnv, - resultSchema, - context.getExecutionConfig()); + if (mergedEnv.getExecution().isStreamingExecution()) { + // initialize result + final DynamicResult result = resultStore.createDynamicResult( + mergedEnv, + resultSchema, + context.getExecutionConfig()); - // create job graph with jars - final JobGraph jobGraph; - try { - jobGraph = createJobGraph(context, context.getSessionContext().getName() + ": " + query, table, - result.getTableSink(), - clusterClient); - } catch (Throwable t) { - // the result needs to be closed as long as - // it not stored in the result store - result.close(); - throw t; - } + // create job graph with jars + final JobGraph jobGraph; + try { + jobGraph = createJobGraph(context, context.getSessionContext().getName() + ": " + query, table, + result.getTableSink(), + clusterClient); + } catch (Throwable t) { + // the result needs to be closed as long as + // it not stored in the result store + result.close(); + throw t; + } + + // store the result with a unique id (the job id for now) + final String resultId = jobGraph.getJobID().toString(); + resultStore.storeDynamicResult(resultId, result); + + // create execution + final Runnable program = () -> { + // we need to submit the job attached for now + // otherwise it is not possible to retrieve the reason why an execution failed + try { + clusterClient.run(jobGraph, context.getClassLoader()); + } catch (ProgramInvocationException e) { + throw new SqlExecutionException("Could not execute table program.", e); + } finally { + try { + clusterClient.shutdown(); + } catch (Exception e) { + // ignore + } + } + }; + + // start result retrieval + result.startRetrieval(program); + + return new ResultDescriptor(resultId, resultSchema, result.isMaterialized()); - // store the result with a unique id (the job id for now) - final String resultId = jobGraph.getJobID().toString(); - resultStore.storeResult(resultId, result); + } else if (mergedEnv.getExecution().isBatchExecution()) { - // create execution - final Runnable program = () -> { - // we need to submit the job attached for now - // otherwise it is not possible to retrieve the reason why an execution failed + if (!mergedEnv.getExecution().isTableMode()) { + throw new SqlExecutionException("Batch queries can only work on table mode"); + } + + BatchResult batchResult = new BatchResult(); + final
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478993#comment-16478993 ] ASF GitHub Bot commented on FLINK-8861: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5660#discussion_r188937336 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java --- @@ -159,6 +160,9 @@ protected void evaluate(ResultTableOperation operation, String binding) { case PREV: gotoPreviousPage(); break; + case FIRST: --- End diff -- I think we don't need a `FIRST`. `GOTO` allows for the same functionality. `LAST` is a special feature to stay at the last page in streaming mode. > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400030#comment-16400030 ] ASF GitHub Bot commented on FLINK-8861: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5660 I see, @twalthr. Sorry for my impatience. > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1630#comment-1630 ] ASF GitHub Bot commented on FLINK-8861: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5660 Hi @xccui, I will review this as soon as possible. Right now the priority is the Flink 1.5 release with FLIP-6 support. > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398966#comment-16398966 ] ASF GitHub Bot commented on FLINK-8861: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5660 Hi @twalthr and @fhueske, the PR has been rebased. I wonder if you could help review it when you are convenient. Thanks~ > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8861) Add support for batch queries in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16389906#comment-16389906 ] ASF GitHub Bot commented on FLINK-8861: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5660 [FLINK-8861] [table] Add support for batch queries in SQL Client ## What is the purpose of the change This PR added support for batch queries in SQL Client. ## Brief change log - Added a `StaticResult` and a `BatchResult` for the batch query results. - Added related methods to `ResultStore` for static results and renamed the existing methods with a prefix "dynamic". - Added the logic for retrieving batch query results consulting to `Dataset.collect()`. - Adapted the viewing logic for static results to a "two-phase" table result view. - Added the first-page option to `CliTableResultView.java`. - Replaced some default values with `""` in `Execution.java`. ## Verifying this change This change can be verified by the added test case `testBatchQueryExecution()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8861 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5660.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5660 commit 85225d504114fe80b1dc6cd85cb5e3daf1a55d36 Author: Xingcan CuiDate: 2018-03-07T17:12:55Z [FLINK-8861][table]Add support for batch queries in SQL Client > Add support for batch queries in SQL Client > --- > > Key: FLINK-8861 > URL: https://issues.apache.org/jira/browse/FLINK-8861 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > This issue is a subtask of part two "Full Embedded SQL Client" of the > implementation plan mentioned in > [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client]. > Similar to streaming queries, it should be possible to execute batch queries > in the SQL Client and collect the results using {{DataSet.collect()}} for > debugging purposes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)