[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-06-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5660


---


[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-06-10 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5660#discussion_r194250975
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
 ---
@@ -159,6 +160,9 @@ protected void evaluate(ResultTableOperation operation, 
String binding) {
case PREV:
gotoPreviousPage();
break;
+   case FIRST:
--- End diff --

From the function's point of view, that's true. I added it just for 
symmetry 😃.


---


[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-05-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5660#discussion_r188937336
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
 ---
@@ -159,6 +160,9 @@ protected void evaluate(ResultTableOperation operation, 
String binding) {
case PREV:
gotoPreviousPage();
break;
+   case FIRST:
--- End diff --

I think we don't need a `FIRST`. `GOTO` allows for the same functionality. 
`LAST` is a special feature to stay at the last page in streaming mode.


---


[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-05-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5660#discussion_r188939606
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -293,59 +339,75 @@ public ResultDescriptor executeQuery(SessionContext 
session, String query) throw
}
 
@Override
-   public TypedResult snapshotResult(SessionContext session, 
String resultId, int pageSize) throws SqlExecutionException {
-   final DynamicResult result = resultStore.getResult(resultId);
-   if (result == null) {
-   throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
-   }
-   if (!result.isMaterialized()) {
-   throw new SqlExecutionException("Invalid result 
retrieval mode.");
+   public TypedResult snapshotResult(SessionContext session, 
String resultId, int
+   pageSize) throws SqlExecutionException {
+   if (!resultStore.isStatic(resultId)) {
+   final DynamicResult result = 
resultStore.getDynamicResult(resultId);
+   if (result == null) {
+   throw new SqlExecutionException("Could not find 
a result with result identifier '" + resultId + "'.");
+   }
+   if (!result.isMaterialized()) {
+   throw new SqlExecutionException("Invalid result 
retrieval mode.");
+   }
+   return ((MaterializedResult) result).snapshot(pageSize);
+   } else {
+   StaticResult staticResult = 
resultStore.getStaticResult(resultId);
+   return staticResult.snapshot(pageSize);
}
-   return ((MaterializedResult) result).snapshot(pageSize);
}
 
@Override
-   public List retrieveResultPage(String resultId, int page) throws 
SqlExecutionException {
-   final DynamicResult result = resultStore.getResult(resultId);
-   if (result == null) {
-   throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
-   }
-   if (!result.isMaterialized()) {
-   throw new SqlExecutionException("Invalid result 
retrieval mode.");
+   public List retrieveResultPage(String resultId, int page) throws
+   SqlExecutionException {
+   if (!resultStore.isStatic(resultId)) {
+   final DynamicResult result = 
resultStore.getDynamicResult(resultId);
+   if (result == null) {
+   throw new SqlExecutionException("Could not find 
a result with result identifier '" + resultId + "'.");
+   }
+   if (!result.isMaterialized()) {
+   throw new SqlExecutionException("Invalid result 
retrieval mode.");
+   }
+   return ((MaterializedResult) result).retrievePage(page);
+   } else {
+   return 
resultStore.getStaticResult(resultId).retrievePage(page);
}
-   return ((MaterializedResult) result).retrievePage(page);
}
 
@Override
public void cancelQuery(SessionContext session, String resultId) throws 
SqlExecutionException {
-   final DynamicResult result = resultStore.getResult(resultId);
-   if (result == null) {
-   throw new SqlExecutionException("Could not find a 
result with result identifier '" + resultId + "'.");
-   }
+   if (!resultStore.isStatic(resultId)) {
--- End diff --

Do we really need the distinction between dynamic and state result here? 
The executor should actually not matter. It should just kill whatever Flink job 
is running.


---


[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-05-17 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5660#discussion_r188938891
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -233,56 +235,100 @@ public ResultDescriptor executeQuery(SessionContext 
session, String query) throw
// deployment
final ClusterClient clusterClient = 
createDeployment(mergedEnv.getDeployment());
 
-   // initialize result
-   final DynamicResult result = resultStore.createResult(
-   mergedEnv,
-   resultSchema,
-   context.getExecutionConfig());
+   if (mergedEnv.getExecution().isStreamingExecution()) {
+   // initialize result
+   final DynamicResult result = 
resultStore.createDynamicResult(
+   mergedEnv,
+   resultSchema,
+   context.getExecutionConfig());
 
-   // create job graph with jars
-   final JobGraph jobGraph;
-   try {
-   jobGraph = createJobGraph(context, 
context.getSessionContext().getName() + ": " + query, table,
-   result.getTableSink(),
-   clusterClient);
-   } catch (Throwable t) {
-   // the result needs to be closed as long as
-   // it not stored in the result store
-   result.close();
-   throw t;
-   }
+   // create job graph with jars
+   final JobGraph jobGraph;
+   try {
+   jobGraph = createJobGraph(context, 
context.getSessionContext().getName() + ": " + query, table,
+   result.getTableSink(),
+   clusterClient);
+   } catch (Throwable t) {
+   // the result needs to be closed as long as
+   // it not stored in the result store
+   result.close();
+   throw t;
+   }
+
+   // store the result with a unique id (the job id for 
now)
+   final String resultId = jobGraph.getJobID().toString();
+   resultStore.storeDynamicResult(resultId, result);
+
+   // create execution
+   final Runnable program = () -> {
+   // we need to submit the job attached for now
+   // otherwise it is not possible to retrieve the 
reason why an execution failed
+   try {
+   clusterClient.run(jobGraph, 
context.getClassLoader());
+   } catch (ProgramInvocationException e) {
+   throw new SqlExecutionException("Could 
not execute table program.", e);
+   } finally {
+   try {
+   clusterClient.shutdown();
+   } catch (Exception e) {
+   // ignore
+   }
+   }
+   };
+
+   // start result retrieval
+   result.startRetrieval(program);
+
+   return new ResultDescriptor(resultId, resultSchema, 
result.isMaterialized());
 
-   // store the result with a unique id (the job id for now)
-   final String resultId = jobGraph.getJobID().toString();
-   resultStore.storeResult(resultId, result);
+   } else if (mergedEnv.getExecution().isBatchExecution()) {
 
-   // create execution
-   final Runnable program = () -> {
-   // we need to submit the job attached for now
-   // otherwise it is not possible to retrieve the reason 
why an execution failed
+   if (!mergedEnv.getExecution().isTableMode()) {
+   throw new SqlExecutionException("Batch queries 
can only work on table mode");
+   }
+
+   BatchResult batchResult = new BatchResult();
+   final JobGraph jobGraph = createJobGraph(
+   context,
+   context.getSessionContext().getName () 
+ ": " + query,
+   table,
+

[GitHub] flink pull request #5660: [FLINK-8861] [table] Add support for batch queries...

2018-03-07 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5660

[FLINK-8861] [table] Add support for batch queries in SQL Client

## What is the purpose of the change

This PR added support for batch queries in SQL Client.

## Brief change log

 - Added a `StaticResult` and a `BatchResult` for the batch query results.
 - Added related methods to `ResultStore` for static results and renamed 
the existing methods with a prefix "dynamic".
 - Added the logic for retrieving batch query results consulting to 
`Dataset.collect()`.
 - Adapted the viewing logic for static results to a "two-phase" table 
result view.
 - Added the first-page option to `CliTableResultView.java`.
 - Replaced some default values with `""` in `Execution.java`.

## Verifying this change

This change can be verified by the added test case 
`testBatchQueryExecution()`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8861

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5660.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5660


commit 85225d504114fe80b1dc6cd85cb5e3daf1a55d36
Author: Xingcan Cui 
Date:   2018-03-07T17:12:55Z

[FLINK-8861][table]Add support for batch queries in SQL Client




---