[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5441


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r168914688
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultStore.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Maintains dynamic results.
+ */
+public class ResultStore {
+
+   private Configuration flinkConfig;
+
+   private Map results;
+
+   public ResultStore(Configuration flinkConfig) {
+   this.flinkConfig = flinkConfig;
+
+   results = new HashMap<>();
+   }
+
+   public DynamicResult createResult(Environment env) {
+   if (!env.getExecution().isStreamingExecution()) {
+   throw new SqlExecutionException("Emission is only 
supported in streaming environments yet.");
+   }
+
+   return new DynamicResult(
+   // determine gateway address (and port if possible)
+   getGatewayAddress(env.getDeployment()),
+   getGatewayPort(env.getDeployment()),
+   env.getExecution().isChangelogMode());
+   }
+
+   public void storeResult(String resultId, DynamicResult result) {
+   results.put(resultId, result);
+   }
+
+   public DynamicResult getResult(String resultId) {
+   return results.get(resultId);
+   }
+
+   public void removeResult(String resultId) {
+   final DynamicResult result = results.get(resultId);
+   result.close();
+   results.remove(resultId);
+   }
+
+   public List getResults() {
+   return new ArrayList<>(results.keySet());
+   }
+
+   // 

+
+   private int getGatewayPort(Deployment deploy) {
+   // try to get address from deployment configuration
+   return deploy.getGatewayPort();
+   }
+
+   private InetAddress getGatewayAddress(Deployment deploy) {
+   // try to get address from deployment configuration
+   final String address = deploy.getGatewayAddress();
+
+   // use manually defined address
+   if (!address.isEmpty()) {
+   try {
+   return InetAddress.getByName(address);
+   } catch (UnknownHostException e) {
+   throw new SqlClientException("Invalid gateway 
address '" + address + "' for result retrieval.", e);
+   }
+   } else {
+   // TODO cache this
+   // try to get the address by communicating to JobManager
+   final String jobManagerAddress = 
flinkConfig.getString(JobManagerOptions.ADDRESS);
+   final int jobManagerPort = 
flinkConfig.getInteger(JobManagerOptions.PORT);
+   if (jobManagerAddress != null && 
!jobManagerAddress.isEmpty()) {
--- End diff --

This logic has been adopted from the DataStreamUtils. It communicates to 
the job manager in order to figure out which network adapter is used for 
communication to cluster in general.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r168447407
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+/**
+ * Strings used for key and values in an environment file.
+ */
+public final class PropertyStrings {
+
+   private PropertyStrings() {
+   // private
+   }
+
+   public static final String EXECUTION = "execution";
+
+   public static final String EXECUTION_TYPE = "type";
+
+   public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
+
+   public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
+
+   public static final String EXECUTION_MIN_STATE_RETENTION = 
"min-idle-state-retention";
+
+   public static final String EXECUTION_MAX_STATE_RETENTION = 
"max-idle-state-retention";
+
+   public static final String EXECUTION_PARALLELISM = "parallelism";
+
+   public static final String EXECUTION_MAX_PARALLELISM = 
"max-parallelism";
+
+   public static final String EXECUTION_RESULT_MODE = "result-mode";
+
+   public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = 
"changelog";
+
+   public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+
+   public static final String DEPLOYMENT = "deployment";
--- End diff --

I think we should leave this generic. It might also contain things like 
YARN application id or whatever there might come in the future.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r168440952
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Configuration of a Flink cluster deployment.
+ */
+public class Deployment {
--- End diff --

Do you mean this globally? Also replacing `deployment` by `gateway` in the 
configuration file?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r168429967
  
--- Diff: flink-libraries/flink-sql-client/pom.xml ---
@@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-sql-client
+   flink-sql-client
--- End diff --

I also thought about the naming. In general, we call components that submit 
Flink jobs "client". So the entire effort will be called "SQL Client". The SQL 
Client consists of a CLI Client and SQL Gateway. SQL server might confuse user 
with Microsofts products.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167903512
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+/**
+ * Result with an attached type (actual payload, EOS, etc.).
+ *
+ * @param  type of payload
+ */
+public class TypedResult {
+
+   private final Type type;
--- End diff --

Make `TypedResult` mutable to make it reuseable?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167889717
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java
 ---
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.cli;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.ResultDescriptor;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import org.jline.keymap.KeyMap;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.jline.utils.InfoCmp.Capability;
+
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
+import static 
org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions;
+import static org.apache.flink.table.client.cli.CliUtils.normalizeColumn;
+import static org.apache.flink.table.client.cli.CliUtils.repeatChar;
+import static org.apache.flink.table.client.cli.CliUtils.rowToString;
+import static org.jline.keymap.KeyMap.ctrl;
+import static org.jline.keymap.KeyMap.esc;
+import static org.jline.keymap.KeyMap.key;
+
+/**
+ * CLI view for retrieving and displaying a changelog stream.
+ */
+public class CliChangelogResultView extends 
CliResultView {
--- End diff --

FH: review!


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167917093
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
+
+   private static final String DEFAULT_ENV_FILE = 
"sql-client-defaults.yaml";
+
+   private final Environment environment;
+   private final List dependencies;
+   private final Configuration flinkConfig;
+   private final ResultStore resultStore;
+
+   public LocalExecutor(URL defaultEnv, List jars, List 
libraries) {
+
+   final String flinkConfigDir;
+   try {
+   // find the configuration directory
+   flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   // load the global configuration
+  

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167916900
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
+
+   private static final String DEFAULT_ENV_FILE = 
"sql-client-defaults.yaml";
+
+   private final Environment environment;
+   private final List dependencies;
+   private final Configuration flinkConfig;
+   private final ResultStore resultStore;
+
+   public LocalExecutor(URL defaultEnv, List jars, List 
libraries) {
+
+   final String flinkConfigDir;
+   try {
+   // find the configuration directory
+   flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   // load the global configuration
+  

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r16794
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
+
+   private static final String DEFAULT_ENV_FILE = 
"sql-client-defaults.yaml";
+
+   private final Environment environment;
+   private final List dependencies;
+   private final Configuration flinkConfig;
+   private final ResultStore resultStore;
+
+   public LocalExecutor(URL defaultEnv, List jars, List 
libraries) {
+
+   final String flinkConfigDir;
+   try {
+   // find the configuration directory
+   flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   // load the global configuration
+  

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167951055
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java
 ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A result of a dynamic table program.
+ */
+public class DynamicResult {
+
+   private final boolean isChangelog;
+   private final CollectTableSink collectTableSink;
+   private final Object resultLock = new Object();
+   private final ResultRetrievalThread retrievalThread;
+   private final JobMonitoringThread monitoringThread;
+   private Runnable program;
+   private SqlExecutionException executionException;
+
+   // for table materialization
+   private final List materializedTable;
+   private final Map rowPositions; // positions of 
rows in table for faster access
+   private final List snapshot;
+   private int pageSize;
+   private boolean isLastSnapshot;
+
+   // for changelog
+   private Tuple2 nextChangeRecord;
+
+   public DynamicResult(InetAddress targetAddress, int targetPort, boolean 
isChangelog) {
+   this.isChangelog = isChangelog;
+
+   // create table sink
+   collectTableSink = new CollectTableSink(targetAddress, 
targetPort);
+   retrievalThread = new ResultRetrievalThread();
+   monitoringThread = new JobMonitoringThread();
+
+   // prepare for table materialization
+   materializedTable = new ArrayList<>();
+   rowPositions = new HashMap<>();
+   snapshot = new ArrayList<>();
+   isLastSnapshot = false;
+   }
+
+   public boolean isChangelog() {
+   return isChangelog;
+   }
+
+   public void startRetrieval(Runnable program) {
+   // start listener thread
+   retrievalThread.start();
+
+   // start program
+   this.program = program;
+   monitoringThread.start();
+   }
+
+   public TypedResult> retrieveRecord() {
+   synchronized (resultLock) {
+   // retrieval thread is alive return a record if 
available
+   // but the program must not have failed
+   if (retrievalThread.isRunning && executionException == 
null) {
+   if (nextChangeRecord == null) {
+   return TypedResult.empty();
+   } else {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   resultLock.notify();
+   return TypedResult.payload(change);
+   }
+   }
+   // retrieval thread is dead but there is still a record 
to be delivered
+   else if (!retrievalThread.isRunning && nextChangeRecord 
!= null) {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   return TypedResult.payload(change);
+   }
+   // no results can be returned anymore
+   else {
+   return handleMissingResult();
+   }
+   }
+   }
+
+   public TypedResult snapshot(int pageSize) 

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167953460
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java
 ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A result of a dynamic table program.
+ */
+public class DynamicResult {
+
+   private final boolean isChangelog;
+   private final CollectTableSink collectTableSink;
+   private final Object resultLock = new Object();
+   private final ResultRetrievalThread retrievalThread;
+   private final JobMonitoringThread monitoringThread;
+   private Runnable program;
+   private SqlExecutionException executionException;
+
+   // for table materialization
+   private final List materializedTable;
+   private final Map rowPositions; // positions of 
rows in table for faster access
+   private final List snapshot;
+   private int pageSize;
+   private boolean isLastSnapshot;
+
+   // for changelog
+   private Tuple2 nextChangeRecord;
+
+   public DynamicResult(InetAddress targetAddress, int targetPort, boolean 
isChangelog) {
+   this.isChangelog = isChangelog;
+
+   // create table sink
+   collectTableSink = new CollectTableSink(targetAddress, 
targetPort);
+   retrievalThread = new ResultRetrievalThread();
+   monitoringThread = new JobMonitoringThread();
+
+   // prepare for table materialization
+   materializedTable = new ArrayList<>();
+   rowPositions = new HashMap<>();
+   snapshot = new ArrayList<>();
+   isLastSnapshot = false;
+   }
+
+   public boolean isChangelog() {
+   return isChangelog;
+   }
+
+   public void startRetrieval(Runnable program) {
+   // start listener thread
+   retrievalThread.start();
+
+   // start program
+   this.program = program;
+   monitoringThread.start();
+   }
+
+   public TypedResult> retrieveRecord() {
+   synchronized (resultLock) {
+   // retrieval thread is alive return a record if 
available
+   // but the program must not have failed
+   if (retrievalThread.isRunning && executionException == 
null) {
+   if (nextChangeRecord == null) {
+   return TypedResult.empty();
+   } else {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   resultLock.notify();
+   return TypedResult.payload(change);
+   }
+   }
+   // retrieval thread is dead but there is still a record 
to be delivered
+   else if (!retrievalThread.isRunning && nextChangeRecord 
!= null) {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   return TypedResult.payload(change);
+   }
+   // no results can be returned anymore
+   else {
+   return handleMissingResult();
+   }
+   }
+   }
+
+   public TypedResult snapshot(int pageSize) 

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167871796
  
--- Diff: flink-libraries/flink-sql-client/pom.xml ---
@@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-sql-client
+   flink-sql-client
+
+   jar
+
+   
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   ${slf4j.version}
+   
+   
+   log4j
+   log4j
+   ${log4j.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-api
+   ${slf4j.version}
+   
+
+   
+   
+   org.jline
+   jline-terminal
+   3.5.1
--- End diff --

jLine 3.5.2 was released in December. We might want to use that version.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167939768
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
+
+   private static final String DEFAULT_ENV_FILE = 
"sql-client-defaults.yaml";
+
+   private final Environment environment;
+   private final List dependencies;
+   private final Configuration flinkConfig;
+   private final ResultStore resultStore;
+
+   public LocalExecutor(URL defaultEnv, List jars, List 
libraries) {
+
+   final String flinkConfigDir;
+   try {
+   // find the configuration directory
+   flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   // load the global configuration
+  

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167914210
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Environment configuration.
--- End diff --

More details about the purpose of this class would be good. What kind of 
properties are stored? What is the scope of the configuration?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167922399
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultStore.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Maintains dynamic results.
+ */
+public class ResultStore {
+
+   private Configuration flinkConfig;
+
+   private Map results;
+
+   public ResultStore(Configuration flinkConfig) {
+   this.flinkConfig = flinkConfig;
+
+   results = new HashMap<>();
+   }
+
+   public DynamicResult createResult(Environment env) {
+   if (!env.getExecution().isStreamingExecution()) {
+   throw new SqlExecutionException("Emission is only 
supported in streaming environments yet.");
--- End diff --

Is this the point at which batch queries will fail? 
Can we fail earlier with a better error message?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167896355
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/CollectTableSink.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Iterator;
+
+/**
+ * Table sink for collecting the results locally.
+ */
+public class CollectTableSink implements RetractStreamTableSink, 
Iterator> {
--- End diff --

I would move the `Iterator` out of the `TableSink` for better separation of 
concerns. A `TableSink` should only be responsible to emit the data, not for 
receiving it.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167949288
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java
 ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A result of a dynamic table program.
+ */
+public class DynamicResult {
+
+   private final boolean isChangelog;
+   private final CollectTableSink collectTableSink;
+   private final Object resultLock = new Object();
+   private final ResultRetrievalThread retrievalThread;
+   private final JobMonitoringThread monitoringThread;
+   private Runnable program;
+   private SqlExecutionException executionException;
+
+   // for table materialization
+   private final List materializedTable;
+   private final Map rowPositions; // positions of 
rows in table for faster access
+   private final List snapshot;
+   private int pageSize;
+   private boolean isLastSnapshot;
+
+   // for changelog
+   private Tuple2 nextChangeRecord;
+
+   public DynamicResult(InetAddress targetAddress, int targetPort, boolean 
isChangelog) {
+   this.isChangelog = isChangelog;
+
+   // create table sink
+   collectTableSink = new CollectTableSink(targetAddress, 
targetPort);
+   retrievalThread = new ResultRetrievalThread();
+   monitoringThread = new JobMonitoringThread();
+
+   // prepare for table materialization
+   materializedTable = new ArrayList<>();
+   rowPositions = new HashMap<>();
+   snapshot = new ArrayList<>();
+   isLastSnapshot = false;
+   }
+
+   public boolean isChangelog() {
+   return isChangelog;
+   }
+
+   public void startRetrieval(Runnable program) {
+   // start listener thread
+   retrievalThread.start();
+
+   // start program
+   this.program = program;
+   monitoringThread.start();
+   }
+
+   public TypedResult> retrieveRecord() {
+   synchronized (resultLock) {
+   // retrieval thread is alive return a record if 
available
+   // but the program must not have failed
+   if (retrievalThread.isRunning && executionException == 
null) {
+   if (nextChangeRecord == null) {
+   return TypedResult.empty();
+   } else {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   resultLock.notify();
+   return TypedResult.payload(change);
+   }
+   }
+   // retrieval thread is dead but there is still a record 
to be delivered
+   else if (!retrievalThread.isRunning && nextChangeRecord 
!= null) {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   return TypedResult.payload(change);
+   }
+   // no results can be returned anymore
+   else {
+   return handleMissingResult();
+   }
+   }
+   }
+
+   public TypedResult snapshot(int pageSize) 

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167944240
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultStore.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Maintains dynamic results.
+ */
+public class ResultStore {
+
+   private Configuration flinkConfig;
+
+   private Map results;
+
+   public ResultStore(Configuration flinkConfig) {
+   this.flinkConfig = flinkConfig;
+
+   results = new HashMap<>();
+   }
+
+   public DynamicResult createResult(Environment env) {
+   if (!env.getExecution().isStreamingExecution()) {
+   throw new SqlExecutionException("Emission is only 
supported in streaming environments yet.");
+   }
+
+   return new DynamicResult(
+   // determine gateway address (and port if possible)
+   getGatewayAddress(env.getDeployment()),
+   getGatewayPort(env.getDeployment()),
+   env.getExecution().isChangelogMode());
+   }
+
+   public void storeResult(String resultId, DynamicResult result) {
+   results.put(resultId, result);
+   }
+
+   public DynamicResult getResult(String resultId) {
+   return results.get(resultId);
+   }
+
+   public void removeResult(String resultId) {
+   final DynamicResult result = results.get(resultId);
+   result.close();
+   results.remove(resultId);
+   }
+
+   public List getResults() {
+   return new ArrayList<>(results.keySet());
+   }
+
+   // 

+
+   private int getGatewayPort(Deployment deploy) {
+   // try to get address from deployment configuration
+   return deploy.getGatewayPort();
+   }
+
+   private InetAddress getGatewayAddress(Deployment deploy) {
+   // try to get address from deployment configuration
+   final String address = deploy.getGatewayAddress();
+
+   // use manually defined address
+   if (!address.isEmpty()) {
+   try {
+   return InetAddress.getByName(address);
+   } catch (UnknownHostException e) {
+   throw new SqlClientException("Invalid gateway 
address '" + address + "' for result retrieval.", e);
+   }
+   } else {
+   // TODO cache this
+   // try to get the address by communicating to JobManager
+   final String jobManagerAddress = 
flinkConfig.getString(JobManagerOptions.ADDRESS);
+   final int jobManagerPort = 
flinkConfig.getInteger(JobManagerOptions.PORT);
+   if (jobManagerAddress != null && 
!jobManagerAddress.isEmpty()) {
--- End diff --

Will this work in a "real" cluster setup with SqlClient, JM, and TMs 
running on different machines?
If I get the logic right, we need the host name of the client where the 
socket server is started, because the sink function 

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167921047
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+/**
+ * Strings used for key and values in an environment file.
+ */
+public final class PropertyStrings {
+
+   private PropertyStrings() {
+   // private
+   }
+
+   public static final String EXECUTION = "execution";
+
+   public static final String EXECUTION_TYPE = "type";
+
+   public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
+
+   public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
+
+   public static final String EXECUTION_MIN_STATE_RETENTION = 
"min-idle-state-retention";
+
+   public static final String EXECUTION_MAX_STATE_RETENTION = 
"max-idle-state-retention";
+
+   public static final String EXECUTION_PARALLELISM = "parallelism";
+
+   public static final String EXECUTION_MAX_PARALLELISM = 
"max-parallelism";
+
+   public static final String EXECUTION_RESULT_MODE = "result-mode";
+
+   public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = 
"changelog";
+
+   public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+
+   public static final String DEPLOYMENT = "deployment";
--- End diff --

rename to flink cluster?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167871058
  
--- Diff: flink-libraries/flink-sql-client/pom.xml ---
@@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-sql-client
+   flink-sql-client
+
+   jar
+
+   
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+
+   
+   org.apache.flink
--- End diff --

indention is off


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167953650
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java
 ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A result of a dynamic table program.
+ */
+public class DynamicResult {
+
+   private final boolean isChangelog;
+   private final CollectTableSink collectTableSink;
+   private final Object resultLock = new Object();
+   private final ResultRetrievalThread retrievalThread;
+   private final JobMonitoringThread monitoringThread;
+   private Runnable program;
+   private SqlExecutionException executionException;
+
+   // for table materialization
+   private final List materializedTable;
+   private final Map rowPositions; // positions of 
rows in table for faster access
+   private final List snapshot;
+   private int pageSize;
+   private boolean isLastSnapshot;
+
+   // for changelog
+   private Tuple2 nextChangeRecord;
+
+   public DynamicResult(InetAddress targetAddress, int targetPort, boolean 
isChangelog) {
+   this.isChangelog = isChangelog;
+
+   // create table sink
+   collectTableSink = new CollectTableSink(targetAddress, 
targetPort);
+   retrievalThread = new ResultRetrievalThread();
+   monitoringThread = new JobMonitoringThread();
+
+   // prepare for table materialization
+   materializedTable = new ArrayList<>();
+   rowPositions = new HashMap<>();
+   snapshot = new ArrayList<>();
+   isLastSnapshot = false;
+   }
+
+   public boolean isChangelog() {
+   return isChangelog;
+   }
+
+   public void startRetrieval(Runnable program) {
+   // start listener thread
+   retrievalThread.start();
+
+   // start program
+   this.program = program;
+   monitoringThread.start();
+   }
+
+   public TypedResult> retrieveRecord() {
+   synchronized (resultLock) {
+   // retrieval thread is alive return a record if 
available
+   // but the program must not have failed
+   if (retrievalThread.isRunning && executionException == 
null) {
+   if (nextChangeRecord == null) {
+   return TypedResult.empty();
+   } else {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   resultLock.notify();
+   return TypedResult.payload(change);
+   }
+   }
+   // retrieval thread is dead but there is still a record 
to be delivered
+   else if (!retrievalThread.isRunning && nextChangeRecord 
!= null) {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   return TypedResult.payload(change);
+   }
+   // no results can be returned anymore
+   else {
+   return handleMissingResult();
+   }
+   }
+   }
+
+   public TypedResult snapshot(int pageSize) 

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167921296
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+/**
+ * Strings used for key and values in an environment file.
+ */
+public final class PropertyStrings {
+
+   private PropertyStrings() {
+   // private
+   }
+
+   public static final String EXECUTION = "execution";
+
+   public static final String EXECUTION_TYPE = "type";
+
+   public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
+
+   public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
+
+   public static final String EXECUTION_MIN_STATE_RETENTION = 
"min-idle-state-retention";
+
+   public static final String EXECUTION_MAX_STATE_RETENTION = 
"max-idle-state-retention";
+
+   public static final String EXECUTION_PARALLELISM = "parallelism";
+
+   public static final String EXECUTION_MAX_PARALLELISM = 
"max-parallelism";
+
+   public static final String EXECUTION_RESULT_MODE = "result-mode";
+
+   public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = 
"changelog";
+
+   public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+
+   public static final String DEPLOYMENT = "deployment";
+
+   public static final String DEPLOYMENT_TYPE = "type";
+
+   public static final String DEPLOYMENT_TYPE_VALUE_STANDALONE = 
"standalone";
+
+   public static final String DEPLOYMENT_RESPONSE_TIMEOUT = 
"response-timeout";
+
+   public static final String DEPLOYMENT_GATEWAY_ADDRESS = 
"client-address";
--- End diff --

rename to `gateway-address` or `flink-gateway-address`?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167912319
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
+
+   private static final String DEFAULT_ENV_FILE = 
"sql-client-defaults.yaml";
+
+   private final Environment environment;
+   private final List dependencies;
+   private final Configuration flinkConfig;
+   private final ResultStore resultStore;
+
+   public LocalExecutor(URL defaultEnv, List jars, List 
libraries) {
+
+   final String flinkConfigDir;
+   try {
+   // find the configuration directory
+   flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   // load the global configuration
+  

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167912662
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
+
+   private static final String DEFAULT_ENV_FILE = 
"sql-client-defaults.yaml";
+
+   private final Environment environment;
+   private final List dependencies;
+   private final Configuration flinkConfig;
+   private final ResultStore resultStore;
+
+   public LocalExecutor(URL defaultEnv, List jars, List 
libraries) {
+
+   final String flinkConfigDir;
+   try {
+   // find the configuration directory
+   flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   // load the global configuration
+  

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167952297
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java
 ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A result of a dynamic table program.
+ */
+public class DynamicResult {
+
+   private final boolean isChangelog;
+   private final CollectTableSink collectTableSink;
+   private final Object resultLock = new Object();
+   private final ResultRetrievalThread retrievalThread;
+   private final JobMonitoringThread monitoringThread;
+   private Runnable program;
+   private SqlExecutionException executionException;
+
+   // for table materialization
+   private final List materializedTable;
+   private final Map rowPositions; // positions of 
rows in table for faster access
+   private final List snapshot;
+   private int pageSize;
+   private boolean isLastSnapshot;
+
+   // for changelog
+   private Tuple2 nextChangeRecord;
+
+   public DynamicResult(InetAddress targetAddress, int targetPort, boolean 
isChangelog) {
+   this.isChangelog = isChangelog;
+
+   // create table sink
+   collectTableSink = new CollectTableSink(targetAddress, 
targetPort);
+   retrievalThread = new ResultRetrievalThread();
+   monitoringThread = new JobMonitoringThread();
+
+   // prepare for table materialization
+   materializedTable = new ArrayList<>();
+   rowPositions = new HashMap<>();
+   snapshot = new ArrayList<>();
+   isLastSnapshot = false;
+   }
+
+   public boolean isChangelog() {
+   return isChangelog;
+   }
+
+   public void startRetrieval(Runnable program) {
+   // start listener thread
+   retrievalThread.start();
+
+   // start program
+   this.program = program;
+   monitoringThread.start();
+   }
+
+   public TypedResult> retrieveRecord() {
+   synchronized (resultLock) {
+   // retrieval thread is alive return a record if 
available
+   // but the program must not have failed
+   if (retrievalThread.isRunning && executionException == 
null) {
+   if (nextChangeRecord == null) {
+   return TypedResult.empty();
+   } else {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   resultLock.notify();
+   return TypedResult.payload(change);
+   }
+   }
+   // retrieval thread is dead but there is still a record 
to be delivered
+   else if (!retrievalThread.isRunning && nextChangeRecord 
!= null) {
+   final Tuple2 change = 
nextChangeRecord;
+   nextChangeRecord = null;
+   return TypedResult.payload(change);
+   }
+   // no results can be returned anymore
+   else {
+   return handleMissingResult();
+   }
+   }
+   }
+
+   public TypedResult snapshot(int pageSize) 

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167898913
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
--- End diff --

Move the code of the local executor into a separate package. This will also 
help to validate that the abstractions are correctly designed.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167936060
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.client.SqlClientException;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Environment configuration.
--- End diff --

Is Environment the same as a session?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167881723
  
--- Diff: flink-libraries/flink-sql-client/pom.xml ---
@@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-sql-client
+   flink-sql-client
--- End diff --

should we call the module `flink-sql-server`? In the future, the 
functionality will be split into server and client, and I assume most of the 
functionality will reside in the server.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167921412
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+/**
+ * Strings used for key and values in an environment file.
+ */
+public final class PropertyStrings {
+
+   private PropertyStrings() {
+   // private
+   }
+
+   public static final String EXECUTION = "execution";
+
+   public static final String EXECUTION_TYPE = "type";
+
+   public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
+
+   public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
+
+   public static final String EXECUTION_MIN_STATE_RETENTION = 
"min-idle-state-retention";
+
+   public static final String EXECUTION_MAX_STATE_RETENTION = 
"max-idle-state-retention";
+
+   public static final String EXECUTION_PARALLELISM = "parallelism";
+
+   public static final String EXECUTION_MAX_PARALLELISM = 
"max-parallelism";
+
+   public static final String EXECUTION_RESULT_MODE = "result-mode";
+
+   public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = 
"changelog";
+
+   public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+
+   public static final String DEPLOYMENT = "deployment";
+
+   public static final String DEPLOYMENT_TYPE = "type";
+
+   public static final String DEPLOYMENT_TYPE_VALUE_STANDALONE = 
"standalone";
+
+   public static final String DEPLOYMENT_RESPONSE_TIMEOUT = 
"response-timeout";
+
+   public static final String DEPLOYMENT_GATEWAY_ADDRESS = 
"client-address";
+
+   public static final String DEPLOYMENT_GATEWAY_PORT = "client-port";
--- End diff --

rename to `gateway-port` or `flink-gateway-port`?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167947410
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java
 ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A result of a dynamic table program.
+ */
+public class DynamicResult {
+
+   private final boolean isChangelog;
+   private final CollectTableSink collectTableSink;
+   private final Object resultLock = new Object();
+   private final ResultRetrievalThread retrievalThread;
+   private final JobMonitoringThread monitoringThread;
+   private Runnable program;
+   private SqlExecutionException executionException;
+
+   // for table materialization
+   private final List materializedTable;
+   private final Map rowPositions; // positions of 
rows in table for faster access
+   private final List snapshot;
+   private int pageSize;
+   private boolean isLastSnapshot;
+
+   // for changelog
+   private Tuple2 nextChangeRecord;
--- End diff --

Also fetch change rows in a List for more efficient retrieval?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167938697
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
+
+   private static final String DEFAULT_ENV_FILE = 
"sql-client-defaults.yaml";
+
+   private final Environment environment;
+   private final List dependencies;
+   private final Configuration flinkConfig;
+   private final ResultStore resultStore;
+
+   public LocalExecutor(URL defaultEnv, List jars, List 
libraries) {
+
+   final String flinkConfigDir;
+   try {
+   // find the configuration directory
+   flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   // load the global configuration
+  

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167902843
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+/**
+ * Result with an attached type (actual payload, EOS, etc.).
+ *
+ * @param  type of payload
+ */
+public class TypedResult {
+
+   private final Type type;
+
+   private final P payload;
+
+   private TypedResult(Type type, P payload) {
+   this.type = type;
+   this.payload = payload;
+   }
+
+   public Type getType() {
+   return type;
+   }
+
+   public P getPayload() {
+   return payload;
+   }
+
+   // 

+
+   public static  TypedResult empty() {
+   return new TypedResult<>(Type.EMPTY, null);
+   }
+
+   public static  TypedResult payload(T payload) {
+   return new TypedResult<>(Type.PAYLOAD, payload);
+   }
+
+   public static  TypedResult endOfStream() {
+   return new TypedResult<>(Type.EOS, null);
+   }
+
+   // 

+
+   /**
+* Result types.
+*/
+   public enum Type {
--- End diff --

Rename to `ResultType` to avoid confusion with `java.lang.reflect.Type`?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167880842
  
--- Diff: flink-libraries/flink-sql-client/pom.xml ---
@@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-sql-client
+   flink-sql-client
+
+   jar
+
+   
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   ${slf4j.version}
+   
+   
+   log4j
+   log4j
+   ${log4j.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-api
+   ${slf4j.version}
+   
+
+   
+   
+   org.jline
+   jline-terminal
+   3.5.1
+   
+   
+   org.jline
+   jline-reader
+   3.5.1
+   
+
+   
+   
+   com.fasterxml.jackson.core
+   jackson-databind
+   2.2.3
--- End diff --

the root `pom.xml` defines a global jackson version (2.7.9). Maybe use that 
one?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167880856
  
--- Diff: flink-libraries/flink-sql-client/pom.xml ---
@@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-sql-client
+   flink-sql-client
+
+   jar
+
+   
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   ${slf4j.version}
+   
+   
+   log4j
+   log4j
+   ${log4j.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-api
+   ${slf4j.version}
+   
+
+   
+   
+   org.jline
+   jline-terminal
+   3.5.1
+   
+   
+   org.jline
+   jline-reader
+   3.5.1
+   
+
+   
+   
+   com.fasterxml.jackson.core
+   jackson-databind
+   2.2.3
+   
+   
+   com.fasterxml.jackson.dataformat
+   jackson-dataformat-yaml
+   2.3.0
--- End diff --

the root `pom.xml` defines a global jackson version (2.7.9). Maybe use that 
one?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167869733
  
--- Diff: flink-libraries/flink-sql-client/bin/sql-client.sh ---
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+

+
+# Adopted from "flink" bash script

+
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+if [ "$iteration" -gt 100 ]; then
+echo "Cannot resolve path: You have a cyclic symlink in $target."
+break
+fi
+ls=`ls -ld -- "$target"`
+target=`expr "$ls" : '.* -> \(.*\)$'`
+iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path
+bin=`dirname "$target"`
+
+# get flink config
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+FLINK_IDENT_STRING="$USER"
+fi
+
+CC_CLASSPATH=`constructFlinkClassPath`
+

+
+# SQL client specific logic

+
+
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log
+log_setting=(-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
+
+export FLINK_ROOT_DIR
+export FLINK_CONF_DIR
+
+# get path of jar in /opt if it exist
+FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex 
".*flink-sql-client.*.jar")
+
+# check if SQL client is already in classpath and must not be shipped 
manually
+if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then
+
+# start client without jar
+exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath 
"`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
org.apache.flink.table.client.SqlClient "$@"
+
+# check if SQL client jar is in /opt
+elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then
+
+# start client with jar
+exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath 
"`manglePathList 
"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" 
org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath 
$FLINK_SQL_CLIENT_JAR`"
+
+# write error message to stderr
+else
+(>&2 echo "[ERROR] Flink SQL client jar neither found in classpath nor 
/opt directory should be located in $FLINK_OPT_DIR.")
--- End diff --

check error message


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167892881
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/CollectTableSink.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.sinks.RetractStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Iterator;
+
+/**
+ * Table sink for collecting the results locally.
+ */
+public class CollectTableSink implements RetractStreamTableSink, 
Iterator> {
+
+   private final InetAddress gatewayAddress;
+   private final int manualGatewayPort;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+   private SocketStreamIterator> iterator;
+
+   public CollectTableSink(InetAddress gatewayAddress, int 
manualGatewayPort) {
+   this.gatewayAddress = gatewayAddress;
+   this.manualGatewayPort = manualGatewayPort;
+   }
+
+   @Override
+   public String[] getFieldNames() {
+   return fieldNames;
+   }
+
+   @Override
+   public TypeInformation[] getFieldTypes() {
+   return fieldTypes;
+   }
+
+   @Override
+   public TableSink> configure(String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   this.fieldNames = fieldNames;
+   this.fieldTypes = fieldTypes;
+   return this;
--- End diff --

According to the JavaDocs `TableSink.configure()` must return a configured 
copy of the `TableSink` and not the `TableSink` object itself.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167882975
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client;
+
+import org.apache.flink.table.client.cli.CliClient;
+import org.apache.flink.table.client.cli.CliOptions;
+import org.apache.flink.table.client.cli.CliOptionsParser;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.LocalExecutor;
+import org.apache.flink.table.client.gateway.SessionContext;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * SQL Client for submitting SQL statements. The client can be executed in 
two
+ * modes: a gateway and embedded mode.
+ *
+ * - In gateway mode, the SQL CLI client connects to the REST API of 
the gateway and allows for
--- End diff --

I don't think we should document features that are not there yet (or at 
least mark them as future features).


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167909635
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java
 ---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.config.Deployment;
+import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Execution;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Executor that performs the Flink communication locally. The calls are 
blocking depending on the
+ * response time to the Flink cluster. Flink jobs are not blocking.
+ */
+public class LocalExecutor implements Executor {
+
+   private static final String DEFAULT_ENV_FILE = 
"sql-client-defaults.yaml";
+
+   private final Environment environment;
+   private final List dependencies;
+   private final Configuration flinkConfig;
+   private final ResultStore resultStore;
+
+   public LocalExecutor(URL defaultEnv, List jars, List 
libraries) {
+
+   final String flinkConfigDir;
+   try {
+   // find the configuration directory
+   flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
+   // load the global configuration
+  

[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167872946
  
--- Diff: flink-libraries/flink-sql-client/pom.xml ---
@@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-sql-client
+   flink-sql-client
+
+   jar
+
+   
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   ${slf4j.version}
+   
+   
+   log4j
+   log4j
+   ${log4j.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-api
+   ${slf4j.version}
+   
+
+   
+   
+   org.jline
+   jline-terminal
+   3.5.1
--- End diff --

The jLine dependencies are BSD licensed. I think we need to update the the 
`./LICENSE` file and include this dependency. 


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167870419
  
--- Diff: flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml ---
@@ -0,0 +1,47 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+

+#==
+# Table Sources

+#==
+sources: [] # empty list
+# Define table sources here. See the documentation for more details.
+# - name: ...
+#   connector: ...
+#   format: ...
+#   schema: ...
+

+#==
+# Execution properties

+#==
+execution:
+  type: streaming
--- End diff --

Add brief explanations for parameters and alternative values for `type` and 
`result-mode`.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167871291
  
--- Diff: flink-libraries/flink-sql-client/pom.xml ---
@@ -0,0 +1,149 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-libraries
+   1.5-SNAPSHOT
+   ..
+   
+
+   flink-sql-client
+   flink-sql-client
+
+   jar
+
+   
+
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
+   
flink-table_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   ${slf4j.version}
+   
+   
--- End diff --

empty line


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5441#discussion_r167869958
  
--- Diff: flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml ---
@@ -0,0 +1,47 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+

+#==
+# Table Sources

+#==
+sources: [] # empty list
+# Define table sources here. See the documentation for more details.
--- End diff --

more concrete pointer (Table API & SQL documentation)?


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-09 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5441

[FLINK-8607] [table] Add a basic embedded SQL CLI client

## What is the purpose of the change

This PR implements the first part of the implementation plan described in 
FLIP-24.

```
Goal: Add the basic features to play around with Flink's streaming SQL.

- Add CLI component that reads the configuration files
- "Pre-registered table sources"
- "Job parameters"
- Add executor for retrieving pre-flight information and corresponding CLI 
SQL parser
- SHOW TABLES
- DESCRIBE TABLE
- EXPLAIN
- Add streaming append query submission to executor
- Submit jars and run SELECT query using the ClusterClient
- Collect results on heap and serve them on the CLI side (Internal Mode 
with SELECT)
- SOURCE (for executing a SQL statement stored in a local file)
```

Additionally, this PR also supports retraction queries and the SET 
operation for setting properties.

The client can be started using `./bin/sql-client.sh embedded`.

A table source must be defined in `./conf/sql-client-defaults.yaml` (for 
example a CSV table source, an example can be found in the test resources 
directory).

The client supports two modes for viewing results. A `changelog` mode or 
`table` mode. They can be selected by setting the `execution.result-mode` 
property.

The code is still work in progress. There are a couple of things that can 
be improved:

- Add more logging instead of swallowing exceptions
- Use Flink's ConfigOptions where applicable (e.g. for better default value 
handling and validation)
- Maybe make the record result retrieval blocking?
- Tests for the LocalExecutor
- Some basic tests for other classes (maybe for CLI classes as well?)
- More deployment options (support for YARN, FLIP-6?)
- Documentation


## Brief change log

- New module `flink-sql-client`
- New executable script in `flink/bin`
- Minor visibility changes in other modules

## Verifying this change

Manually verified. Further tests will follow.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8607

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5441.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5441


commit 17b6b6d8b8804382b7b855033c370ffb4fd673ac
Author: twalthr 
Date:   2017-12-07T12:46:31Z

[FLINK-8607] [table] Add a basic embedded SQL CLI client




---