[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5660 ---
[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5660#discussion_r194250975 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java --- @@ -159,6 +160,9 @@ protected void evaluate(ResultTableOperation operation, String binding) { case PREV: gotoPreviousPage(); break; + case FIRST: --- End diff -- From the function's point of view, that's true. I added it just for symmetry ð. ---
[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5660#discussion_r188937336 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java --- @@ -159,6 +160,9 @@ protected void evaluate(ResultTableOperation operation, String binding) { case PREV: gotoPreviousPage(); break; + case FIRST: --- End diff -- I think we don't need a `FIRST`. `GOTO` allows for the same functionality. `LAST` is a special feature to stay at the last page in streaming mode. ---
[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5660#discussion_r188939606 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -293,59 +339,75 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw } @Override - public TypedResult snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); - if (result == null) { - throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); - } - if (!result.isMaterialized()) { - throw new SqlExecutionException("Invalid result retrieval mode."); + public TypedResult snapshotResult(SessionContext session, String resultId, int + pageSize) throws SqlExecutionException { + if (!resultStore.isStatic(resultId)) { + final DynamicResult result = resultStore.getDynamicResult(resultId); + if (result == null) { + throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); + } + if (!result.isMaterialized()) { + throw new SqlExecutionException("Invalid result retrieval mode."); + } + return ((MaterializedResult) result).snapshot(pageSize); + } else { + StaticResult staticResult = resultStore.getStaticResult(resultId); + return staticResult.snapshot(pageSize); } - return ((MaterializedResult) result).snapshot(pageSize); } @Override - public List retrieveResultPage(String resultId, int page) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); - if (result == null) { - throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); - } - if (!result.isMaterialized()) { - throw new SqlExecutionException("Invalid result retrieval mode."); + public List retrieveResultPage(String resultId, int page) throws + SqlExecutionException { + if (!resultStore.isStatic(resultId)) { + final DynamicResult result = resultStore.getDynamicResult(resultId); + if (result == null) { + throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); + } + if (!result.isMaterialized()) { + throw new SqlExecutionException("Invalid result retrieval mode."); + } + return ((MaterializedResult) result).retrievePage(page); + } else { + return resultStore.getStaticResult(resultId).retrievePage(page); } - return ((MaterializedResult) result).retrievePage(page); } @Override public void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); - if (result == null) { - throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); - } + if (!resultStore.isStatic(resultId)) { --- End diff -- Do we really need the distinction between dynamic and state result here? The executor should actually not matter. It should just kill whatever Flink job is running. ---
[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5660#discussion_r188938891 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java --- @@ -233,56 +235,100 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw // deployment final ClusterClient clusterClient = createDeployment(mergedEnv.getDeployment()); - // initialize result - final DynamicResult result = resultStore.createResult( - mergedEnv, - resultSchema, - context.getExecutionConfig()); + if (mergedEnv.getExecution().isStreamingExecution()) { + // initialize result + final DynamicResult result = resultStore.createDynamicResult( + mergedEnv, + resultSchema, + context.getExecutionConfig()); - // create job graph with jars - final JobGraph jobGraph; - try { - jobGraph = createJobGraph(context, context.getSessionContext().getName() + ": " + query, table, - result.getTableSink(), - clusterClient); - } catch (Throwable t) { - // the result needs to be closed as long as - // it not stored in the result store - result.close(); - throw t; - } + // create job graph with jars + final JobGraph jobGraph; + try { + jobGraph = createJobGraph(context, context.getSessionContext().getName() + ": " + query, table, + result.getTableSink(), + clusterClient); + } catch (Throwable t) { + // the result needs to be closed as long as + // it not stored in the result store + result.close(); + throw t; + } + + // store the result with a unique id (the job id for now) + final String resultId = jobGraph.getJobID().toString(); + resultStore.storeDynamicResult(resultId, result); + + // create execution + final Runnable program = () -> { + // we need to submit the job attached for now + // otherwise it is not possible to retrieve the reason why an execution failed + try { + clusterClient.run(jobGraph, context.getClassLoader()); + } catch (ProgramInvocationException e) { + throw new SqlExecutionException("Could not execute table program.", e); + } finally { + try { + clusterClient.shutdown(); + } catch (Exception e) { + // ignore + } + } + }; + + // start result retrieval + result.startRetrieval(program); + + return new ResultDescriptor(resultId, resultSchema, result.isMaterialized()); - // store the result with a unique id (the job id for now) - final String resultId = jobGraph.getJobID().toString(); - resultStore.storeResult(resultId, result); + } else if (mergedEnv.getExecution().isBatchExecution()) { - // create execution - final Runnable program = () -> { - // we need to submit the job attached for now - // otherwise it is not possible to retrieve the reason why an execution failed + if (!mergedEnv.getExecution().isTableMode()) { + throw new SqlExecutionException("Batch queries can only work on table mode"); + } + + BatchResult batchResult = new BatchResult(); + final JobGraph jobGraph = createJobGraph( + context, + context.getSessionContext().getName () + ": " + query, + table, +
[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5660 [FLINK-8861] [table] Add support for batch queries in SQL Client ## What is the purpose of the change This PR added support for batch queries in SQL Client. ## Brief change log - Added a `StaticResult` and a `BatchResult` for the batch query results. - Added related methods to `ResultStore` for static results and renamed the existing methods with a prefix "dynamic". - Added the logic for retrieving batch query results consulting to `Dataset.collect()`. - Adapted the viewing logic for static results to a "two-phase" table result view. - Added the first-page option to `CliTableResultView.java`. - Replaced some default values with `""` in `Execution.java`. ## Verifying this change This change can be verified by the added test case `testBatchQueryExecution()`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8861 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5660.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5660 commit 85225d504114fe80b1dc6cd85cb5e3daf1a55d36 Author: Xingcan CuiDate: 2018-03-07T17:12:55Z [FLINK-8861][table]Add support for batch queries in SQL Client ---