[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6332


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202536081
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

I added some unit tests.


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-14 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202520618
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -329,14 +341,46 @@ public void stop(SessionContext session) {
}
}
 
-   private  ResultDescriptor executeQueryInternal(ExecutionContext 
context, String query) {
+   private  ProgramTargetDescriptor 
executeUpdateInternal(ExecutionContext context, String statement) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   // apply update statement
--- End diff --

But there is no additional value of 
```
// apply update statement
``` 
over 
```
applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), 
statement);
```
which is completely self explanatory. This is just form of some small code 
duplication. Same applies to most of those comments.


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-14 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202520454
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) {
}
}
 
+   private boolean callInsertInto(SqlCommandCall cmdCall) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
+   terminal.flush();
+
+   try {
+   final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
--- End diff --

hmmm, what about
```

terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
terminal.writer().println(programTarget.toString())
```?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-14 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202520415
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

Hmmm, that's bad :( Is it hard to start local flink mini cluster and 
execute `cli` process from `JUnit` test? (Maybe as a follow up after feature 
freeze)


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507962
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -329,14 +341,46 @@ public void stop(SessionContext session) {
}
}
 
-   private  ResultDescriptor executeQueryInternal(ExecutionContext 
context, String query) {
+   private  ProgramTargetDescriptor 
executeUpdateInternal(ExecutionContext context, String statement) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   // apply update statement
--- End diff --

On one side yes but on the other side it allows to read the comments from 
top to bottom and know what the method is doing without having to look at the 
actual code.


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507930
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) {
}
}
 
+   private boolean callInsertInto(SqlCommandCall cmdCall) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
+   terminal.flush();
+
+   try {
+   final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
--- End diff --

Initially I had it similar like your proposal, but this would mix a data 
model class and visualization. `ProgramTargetDescriptor` should not be 
responsible how it is represented in the CLI.


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507864
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

The entire CliClient is only tested manually so far.


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507463
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 ---
@@ -97,14 +97,34 @@ private void start() {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new 
EmbeddedShutdownThread(context, executor));
 
-   // start CLI
-   final CliClient cli = new CliClient(context, executor);
-   cli.open();
+   // do the actual work
+   openCli(context, executor);
} else {
throw new SqlClientException("Gateway mode is not 
supported yet.");
}
}
 
+   /**
+* Opens the CLI client for executing SQL statements.
+*
+* @param context session context
+* @param executor executor
+*/
+   private void openCli(SessionContext context, Executor executor) {
+   final CliClient cli = new CliClient(context, executor);
+   // interactive CLI mode
+   if (options.getUpdateStatement() == null) {
+   cli.open();
+   }
+   // execute single update statement
+   else {
+   final boolean success = 
cli.submitUpdate(options.getUpdateStatement());
--- End diff --

No, this would block the process for unbounded queries and require a 
(fault-tolerant) monitoring in the SQL Client which is not intended. We just 
block until the statement has been submitted to the cluster.


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-14 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202507424
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor 
executor) {
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
+   // make space from previous output and test the writer
+   terminal.writer().println();
--- End diff --

It makes the output on the terminal nicer. We don't know what has been 
printed before. This starts a terminal session. The output looks now like:

```
No default environment specified.
Searching for 
'/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml'...found.
Reading default environment from: 
file:/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml
No session environment specified.

[INFO] Executing the following statement:
INSERT INTO MyTableName SELECT * FROM MyTableName
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the 
cluster:
Cluster ID: StandaloneClusterId / Job ID: fab21f0632da36f9236c343c2850c71d
For the current job status visit: http://localhost:8081

Shutting down executor...done.
```


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202398523
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor 
executor) {
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
+   // make space from previous output and test the writer
+   terminal.writer().println();
--- End diff --

why is that needed? Shouldn't we print new line at then of the output 
instead?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202421703
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -371,6 +416,13 @@ public void stop(SessionContext session) {
result.isMaterialized());
}
 
+   /**
+* Creates a table using the given query in the given table environment.
+*
+* @param tableEnv table environment
+* @param query SQL SELECT query
+* @return result table object
+*/
private Table createTable(TableEnvironment tableEnv, String query) {
--- End diff --

rename `query` -> `selectQuery` and drop `@param` section from java doc?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202421784
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -381,6 +433,23 @@ private Table createTable(TableEnvironment tableEnv, 
String query) {
}
}
 
+   /**
+* Applies the given update statement to the given table environment 
with query configuration.
+*
+* @param tableEnv table environment
+* @param queryConfig query configuration
+* @param statement SQL update statement (e.g. INSERT INTO)
+*/
+   private void applyUpdate(TableEnvironment tableEnv, QueryConfig 
queryConfig, String statement) {
--- End diff --

rename `statement ` -> `updateStatement` and drop `@param` section from 
java doc?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202421005
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ---
@@ -329,14 +341,46 @@ public void stop(SessionContext session) {
}
}
 
-   private  ResultDescriptor executeQueryInternal(ExecutionContext 
context, String query) {
+   private  ProgramTargetDescriptor 
executeUpdateInternal(ExecutionContext context, String statement) {
+   final ExecutionContext.EnvironmentInstance envInst = 
context.createEnvironmentInstance();
+
+   // apply update statement
--- End diff --

some of those comments are a little bit unnecessary?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202407651
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
+   terminal.writer().println(new 
AttributedString(statement).toString());
+   terminal.flush();
 
-   if (cmdCall == null) {
-   
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL));
-   continue;
-   }
+   final Optional parsedStatement = 
SqlCommandParser.parse(statement);
--- End diff --

deduplicate parsing call and `if` check - there is already a bug here, 
either missing `flush` or unnecessary `flush`


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202427682
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
 ---
@@ -73,7 +86,7 @@ public JobExecutionResult fetchExecutionResult() {
 * Deploys a job. Depending on the deployment creates a new job 
cluster. It saves the cluster id in
 * the result and blocks until job completion.
 */
-   private  JobExecutionResult deployJob(ExecutionContext context, 
JobGraph jobGraph, DynamicResult result) {
+   private  void deployJob(ExecutionContext context, JobGraph 
jobGraph, Result result) {
--- End diff --

split this method? Extract `if (context.getClusterId() == null)` and `else` 
branches to separate methods?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202408478
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
+   terminal.writer().println(new 
AttributedString(statement).toString());
+   terminal.flush();
 
-   if (cmdCall == null) {
-   
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL));
-   continue;
-   }
+   final Optional parsedStatement = 
SqlCommandParser.parse(statement);
+   if (!parsedStatement.isPresent()) {
+   
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL).toAnsi());
+   terminal.flush();
+   return false;
+   }
 
-   switch (cmdCall.command) {
-   case QUIT:
-   case EXIT:
-   callQuit(cmdCall);
-   break;
-   case CLEAR:
-   callClear(cmdCall);
-   break;
-   case RESET:
-   callReset(cmdCall);
-   break;
-   case SET:
-   callSet(cmdCall);
-   break;
-   case HELP:
-   callHelp(cmdCall);
-   break;
-   case SHOW_TABLES:
-   callShowTables(cmdCall);
-   break;
-   case SHOW_FUNCTIONS:
-   callShowFunctions(cmdCall);
-   break;
-   case DESCRIBE:
-   callDescribe(cmdCall);
-   break;
-   case EXPLAIN:
-   callExplain(cmdCall);
-   break;
-   case SELECT:
-   callSelect(cmdCall);
-   break;
-   case SOURCE:
-   callSource(cmdCall);
-   break;
-   }
+   final SqlCommandCall cmdCall = parsedStatement.get();
+   switch (cmdCall.command) {
+   case INSERT_INTO:
+   return callInsertInto(cmdCall);
+   default:
+   
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNSUPPORTED_SQL).toAnsi());
+   terminal.flush();
}
+
+   return false;
}
 
// 

 
+   private void parseAndCall(String line) {
+   final Optional parsedLine = 
SqlCommandParser.parse(line);
+
+   if (!parsedLine.isPresent()) {
+   
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL).toAnsi());
+   return;
+   }
+
+   final SqlCommandCall cmdCall = parsedLine.get();
+   switch (cmdCall.command) {
+   case QUIT:
+   case EXIT:
+   callQuit(cmdCall);
+   break;
+   case CLEAR:
+   callClear(cmdCall);
+   break;
+   case RESET:
+   

[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202420258
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -354,6 +398,23 @@ private void callSelect(SqlCommandCall cmdCall) {
}
}
 
+   private boolean callInsertInto(SqlCommandCall cmdCall) {
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
+   terminal.flush();
+
+   try {
+   final ProgramTargetDescriptor programTarget = 
executor.executeUpdate(context, cmdCall.operands[0]);
+   
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
--- End diff --

```
programTarget.writeTo(terminal.writer())
```
? It would be easier to add more fields in the future and more difficult to 
forget about printing them.



---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202419608
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ---
@@ -173,55 +180,92 @@ public void open() {
if (line == null || line.equals("")) {
continue;
}
+   parseAndCall(line);
+   }
+   }
 
-   final SqlCommandCall cmdCall = 
SqlCommandParser.parse(line);
+   /**
+* Submits a SQL update statement and prints status information and/or 
errors on the terminal.
+*
+* @param statement SQL update statement
+* @return flag to indicate if the submission was successful or not
+*/
+   public boolean submitUpdate(String statement) {
--- End diff --

do we have tests for that? Some `ITCase`? 


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6332#discussion_r202407072
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 ---
@@ -97,14 +97,34 @@ private void start() {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new 
EmbeddedShutdownThread(context, executor));
 
-   // start CLI
-   final CliClient cli = new CliClient(context, executor);
-   cli.open();
+   // do the actual work
+   openCli(context, executor);
} else {
throw new SqlClientException("Gateway mode is not 
supported yet.");
}
}
 
+   /**
+* Opens the CLI client for executing SQL statements.
+*
+* @param context session context
+* @param executor executor
+*/
+   private void openCli(SessionContext context, Executor executor) {
+   final CliClient cli = new CliClient(context, executor);
+   // interactive CLI mode
+   if (options.getUpdateStatement() == null) {
+   cli.open();
+   }
+   // execute single update statement
+   else {
+   final boolean success = 
cli.submitUpdate(options.getUpdateStatement());
--- End diff --

we do not wait for the query to complete?


---


[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...

2018-07-13 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6332

[FLINK-8858] [sql-client] Add support for INSERT INTO in SQL Client

## What is the purpose of the change

This PR adds support for the SQL `INSERT INTO` statement in SQL Client. 
This PR depends on #6323 for finalizing the unified table sinks. The PR adds 
support for submitting `INSERT INTO` statements in the CLI shell as well as 
using the `-u` command line option. The command line option is the basis for 
end-to-end testing of the SQL Client (FLINK-8970).


## Brief change log

- Refactor the `XXXResult` classes
- Add `INSERT INTO` support to CLI and local executor
- Add `-u` command line parameter


## Verifying this change

- `DependencyTest` has been adapted
- New test 
`o.a.f.table.client.gateway.local.LocalExecutorITCase#testStreamQueryExecutionSink`
- New test 
`o.a.f.table.client.gateway.local.ExecutionContextTest#testSourceSinks` 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8858

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6332.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6332


commit 980499f887d72ddf9a405c4ad200d0cab15d889c
Author: Timo Walther 
Date:   2018-06-27T11:16:49Z

[FLINK-8558] [table] Add unified format interfaces and separate formats 
from connectors

This PR introduces a format discovery mechanism based on Java Service 
Providers. The general `TableFormatFactory` is similar to the existing table 
source discovery mechanism. However, it allows for arbirary format interfaces 
that might be introduced in the future. At the moment, a connector can request 
configured instances of `DeserializationSchema` and `SerializationSchema`. In 
the future we can add interfaces such as a `Writer` or 
`KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for 
the Kafa table sources and table source factories. It introduces 
descriptor-based alternatives.

commit 42a8a156d4e6f8f3d119c458350b6c897306fc48
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

[FLINK-8866] [table] Create unified interfaces to configure and instatiate 
TableSinks

This closes #6201.

commit 311dc62e59c0e4146c094b73c21b979f31b2e1d9
Author: Timo Walther 
Date:   2018-07-11T11:29:03Z

Rename to TableFactory and move it to factories package

commit 1c581cba61ba321bb6de6a4d298a881840d11cfe
Author: Timo Walther 
Date:   2018-07-11T11:46:31Z

Refactor format factories

commit 5c6df7598d1f1c3c698ae9b6b35eb37d7fff8295
Author: Timo Walther 
Date:   2018-07-12T06:35:00Z

Unify table factories

commit 0cd7c44c006aba21c32d8785d17bfc3dbee03916
Author: Timo Walther 
Date:   2018-07-12T07:05:50Z

Move table type out of descriptors

commit 6b83f2e1c0e63147f049dc5389c5633077b789a4
Author: Timo Walther 
Date:   2018-07-12T08:50:09Z

Make source/sink factories environment-dependent

commit 4f1255fd003080f078afe6ef67ffa58f40ffec36
Author: Timo Walther 
Date:   2018-07-12T18:48:45Z

Clean up and simplify changes

commit 10fc3b3d0a5b620a7f0eb49b9941a9d2d9ae2b58
Author: Timo Walther 
Date:   2018-07-05T16:11:18Z

[FLINK-8858] [sql-client] Add support for INSERT INTO in SQL Client




---