[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5441 ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r168914688 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultStore.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.net.ConnectionUtils; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Maintains dynamic results. + */ +public class ResultStore { + + private Configuration flinkConfig; + + private Mapresults; + + public ResultStore(Configuration flinkConfig) { + this.flinkConfig = flinkConfig; + + results = new HashMap<>(); + } + + public DynamicResult createResult(Environment env) { + if (!env.getExecution().isStreamingExecution()) { + throw new SqlExecutionException("Emission is only supported in streaming environments yet."); + } + + return new DynamicResult( + // determine gateway address (and port if possible) + getGatewayAddress(env.getDeployment()), + getGatewayPort(env.getDeployment()), + env.getExecution().isChangelogMode()); + } + + public void storeResult(String resultId, DynamicResult result) { + results.put(resultId, result); + } + + public DynamicResult getResult(String resultId) { + return results.get(resultId); + } + + public void removeResult(String resultId) { + final DynamicResult result = results.get(resultId); + result.close(); + results.remove(resultId); + } + + public List getResults() { + return new ArrayList<>(results.keySet()); + } + + // + + private int getGatewayPort(Deployment deploy) { + // try to get address from deployment configuration + return deploy.getGatewayPort(); + } + + private InetAddress getGatewayAddress(Deployment deploy) { + // try to get address from deployment configuration + final String address = deploy.getGatewayAddress(); + + // use manually defined address + if (!address.isEmpty()) { + try { + return InetAddress.getByName(address); + } catch (UnknownHostException e) { + throw new SqlClientException("Invalid gateway address '" + address + "' for result retrieval.", e); + } + } else { + // TODO cache this + // try to get the address by communicating to JobManager + final String jobManagerAddress = flinkConfig.getString(JobManagerOptions.ADDRESS); + final int jobManagerPort = flinkConfig.getInteger(JobManagerOptions.PORT); + if (jobManagerAddress != null && !jobManagerAddress.isEmpty()) { --- End diff -- This logic has been adopted from the DataStreamUtils. It communicates to the job manager in order to figure out which network adapter is used for communication to cluster in general. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r168447407 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +/** + * Strings used for key and values in an environment file. + */ +public final class PropertyStrings { + + private PropertyStrings() { + // private + } + + public static final String EXECUTION = "execution"; + + public static final String EXECUTION_TYPE = "type"; + + public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming"; + + public static final String EXECUTION_TYPE_VALUE_BATCH = "batch"; + + public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention"; + + public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention"; + + public static final String EXECUTION_PARALLELISM = "parallelism"; + + public static final String EXECUTION_MAX_PARALLELISM = "max-parallelism"; + + public static final String EXECUTION_RESULT_MODE = "result-mode"; + + public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = "changelog"; + + public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table"; + + public static final String DEPLOYMENT = "deployment"; --- End diff -- I think we should leave this generic. It might also contain things like YARN application id or whatever there might come in the future. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r168440952 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Deployment.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Configuration of a Flink cluster deployment. + */ +public class Deployment { --- End diff -- Do you mean this globally? Also replacing `deployment` by `gateway` in the configuration file? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r168429967 --- Diff: flink-libraries/flink-sql-client/pom.xml --- @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-libraries + 1.5-SNAPSHOT + .. + + + flink-sql-client + flink-sql-client --- End diff -- I also thought about the naming. In general, we call components that submit Flink jobs "client". So the entire effort will be called "SQL Client". The SQL Client consists of a CLI Client and SQL Gateway. SQL server might confuse user with Microsofts products. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167903512 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +/** + * Result with an attached type (actual payload, EOS, etc.). + * + * @param type of payload + */ +public class TypedResult { + + private final Type type; --- End diff -- Make `TypedResult` mutable to make it reuseable? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167889717 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliChangelogResultView.java --- @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.cli; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.client.gateway.ResultDescriptor; +import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.types.Row; + +import org.jline.keymap.KeyMap; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; +import org.jline.utils.InfoCmp.Capability; + +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER; +import static org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions; +import static org.apache.flink.table.client.cli.CliUtils.normalizeColumn; +import static org.apache.flink.table.client.cli.CliUtils.repeatChar; +import static org.apache.flink.table.client.cli.CliUtils.rowToString; +import static org.jline.keymap.KeyMap.ctrl; +import static org.jline.keymap.KeyMap.esc; +import static org.jline.keymap.KeyMap.key; + +/** + * CLI view for retrieving and displaying a changelog stream. + */ +public class CliChangelogResultView extends CliResultView { --- End diff -- FH: review! ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167917093 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List jars, List libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration +
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167916900 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List jars, List libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration +
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r16794 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List jars, List libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration +
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167951055 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A result of a dynamic table program. + */ +public class DynamicResult { + + private final boolean isChangelog; + private final CollectTableSink collectTableSink; + private final Object resultLock = new Object(); + private final ResultRetrievalThread retrievalThread; + private final JobMonitoringThread monitoringThread; + private Runnable program; + private SqlExecutionException executionException; + + // for table materialization + private final List materializedTable; + private final MaprowPositions; // positions of rows in table for faster access + private final List snapshot; + private int pageSize; + private boolean isLastSnapshot; + + // for changelog + private Tuple2
nextChangeRecord; + + public DynamicResult(InetAddress targetAddress, int targetPort, boolean isChangelog) { + this.isChangelog = isChangelog; + + // create table sink + collectTableSink = new CollectTableSink(targetAddress, targetPort); + retrievalThread = new ResultRetrievalThread(); + monitoringThread = new JobMonitoringThread(); + + // prepare for table materialization + materializedTable = new ArrayList<>(); + rowPositions = new HashMap<>(); + snapshot = new ArrayList<>(); + isLastSnapshot = false; + } + + public boolean isChangelog() { + return isChangelog; + } + + public void startRetrieval(Runnable program) { + // start listener thread + retrievalThread.start(); + + // start program + this.program = program; + monitoringThread.start(); + } + + public TypedResult > retrieveRecord() { + synchronized (resultLock) { + // retrieval thread is alive return a record if available + // but the program must not have failed + if (retrievalThread.isRunning && executionException == null) { + if (nextChangeRecord == null) { + return TypedResult.empty(); + } else { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + resultLock.notify(); + return TypedResult.payload(change); + } + } + // retrieval thread is dead but there is still a record to be delivered + else if (!retrievalThread.isRunning && nextChangeRecord != null) { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + return TypedResult.payload(change); + } + // no results can be returned anymore + else { + return handleMissingResult(); + } + } + } + + public TypedResult snapshot(int pageSize)
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167953460 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A result of a dynamic table program. + */ +public class DynamicResult { + + private final boolean isChangelog; + private final CollectTableSink collectTableSink; + private final Object resultLock = new Object(); + private final ResultRetrievalThread retrievalThread; + private final JobMonitoringThread monitoringThread; + private Runnable program; + private SqlExecutionException executionException; + + // for table materialization + private final List materializedTable; + private final MaprowPositions; // positions of rows in table for faster access + private final List snapshot; + private int pageSize; + private boolean isLastSnapshot; + + // for changelog + private Tuple2
nextChangeRecord; + + public DynamicResult(InetAddress targetAddress, int targetPort, boolean isChangelog) { + this.isChangelog = isChangelog; + + // create table sink + collectTableSink = new CollectTableSink(targetAddress, targetPort); + retrievalThread = new ResultRetrievalThread(); + monitoringThread = new JobMonitoringThread(); + + // prepare for table materialization + materializedTable = new ArrayList<>(); + rowPositions = new HashMap<>(); + snapshot = new ArrayList<>(); + isLastSnapshot = false; + } + + public boolean isChangelog() { + return isChangelog; + } + + public void startRetrieval(Runnable program) { + // start listener thread + retrievalThread.start(); + + // start program + this.program = program; + monitoringThread.start(); + } + + public TypedResult > retrieveRecord() { + synchronized (resultLock) { + // retrieval thread is alive return a record if available + // but the program must not have failed + if (retrievalThread.isRunning && executionException == null) { + if (nextChangeRecord == null) { + return TypedResult.empty(); + } else { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + resultLock.notify(); + return TypedResult.payload(change); + } + } + // retrieval thread is dead but there is still a record to be delivered + else if (!retrievalThread.isRunning && nextChangeRecord != null) { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + return TypedResult.payload(change); + } + // no results can be returned anymore + else { + return handleMissingResult(); + } + } + } + + public TypedResult snapshot(int pageSize)
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167871796 --- Diff: flink-libraries/flink-sql-client/pom.xml --- @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-libraries + 1.5-SNAPSHOT + .. + + + flink-sql-client + flink-sql-client + + jar + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + + flink-clients_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-table_${scala.binary.version} + ${project.version} + + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + log4j + log4j + ${log4j.version} + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + org.jline + jline-terminal + 3.5.1 --- End diff -- jLine 3.5.2 was released in December. We might want to use that version. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167939768 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List jars, List libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration +
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167914210 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.table.client.SqlClientException; + +import java.io.IOException; +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Environment configuration. --- End diff -- More details about the purpose of this class would be good. What kind of properties are stored? What is the scope of the configuration? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167922399 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultStore.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.net.ConnectionUtils; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Maintains dynamic results. + */ +public class ResultStore { + + private Configuration flinkConfig; + + private Mapresults; + + public ResultStore(Configuration flinkConfig) { + this.flinkConfig = flinkConfig; + + results = new HashMap<>(); + } + + public DynamicResult createResult(Environment env) { + if (!env.getExecution().isStreamingExecution()) { + throw new SqlExecutionException("Emission is only supported in streaming environments yet."); --- End diff -- Is this the point at which batch queries will fail? Can we fail earlier with a better error message? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167896355 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/CollectTableSink.java --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.streaming.experimental.SocketStreamIterator; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Iterator; + +/** + * Table sink for collecting the results locally. + */ +public class CollectTableSink implements RetractStreamTableSink, Iterator> { --- End diff -- I would move the `Iterator` out of the `TableSink` for better separation of concerns. A `TableSink` should only be responsible to emit the data, not for receiving it. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167949288 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A result of a dynamic table program. + */ +public class DynamicResult { + + private final boolean isChangelog; + private final CollectTableSink collectTableSink; + private final Object resultLock = new Object(); + private final ResultRetrievalThread retrievalThread; + private final JobMonitoringThread monitoringThread; + private Runnable program; + private SqlExecutionException executionException; + + // for table materialization + private final List materializedTable; + private final MaprowPositions; // positions of rows in table for faster access + private final List snapshot; + private int pageSize; + private boolean isLastSnapshot; + + // for changelog + private Tuple2
nextChangeRecord; + + public DynamicResult(InetAddress targetAddress, int targetPort, boolean isChangelog) { + this.isChangelog = isChangelog; + + // create table sink + collectTableSink = new CollectTableSink(targetAddress, targetPort); + retrievalThread = new ResultRetrievalThread(); + monitoringThread = new JobMonitoringThread(); + + // prepare for table materialization + materializedTable = new ArrayList<>(); + rowPositions = new HashMap<>(); + snapshot = new ArrayList<>(); + isLastSnapshot = false; + } + + public boolean isChangelog() { + return isChangelog; + } + + public void startRetrieval(Runnable program) { + // start listener thread + retrievalThread.start(); + + // start program + this.program = program; + monitoringThread.start(); + } + + public TypedResult > retrieveRecord() { + synchronized (resultLock) { + // retrieval thread is alive return a record if available + // but the program must not have failed + if (retrievalThread.isRunning && executionException == null) { + if (nextChangeRecord == null) { + return TypedResult.empty(); + } else { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + resultLock.notify(); + return TypedResult.payload(change); + } + } + // retrieval thread is dead but there is still a record to be delivered + else if (!retrievalThread.isRunning && nextChangeRecord != null) { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + return TypedResult.payload(change); + } + // no results can be returned anymore + else { + return handleMissingResult(); + } + } + } + + public TypedResult snapshot(int pageSize)
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167944240 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ResultStore.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.net.ConnectionUtils; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Maintains dynamic results. + */ +public class ResultStore { + + private Configuration flinkConfig; + + private Mapresults; + + public ResultStore(Configuration flinkConfig) { + this.flinkConfig = flinkConfig; + + results = new HashMap<>(); + } + + public DynamicResult createResult(Environment env) { + if (!env.getExecution().isStreamingExecution()) { + throw new SqlExecutionException("Emission is only supported in streaming environments yet."); + } + + return new DynamicResult( + // determine gateway address (and port if possible) + getGatewayAddress(env.getDeployment()), + getGatewayPort(env.getDeployment()), + env.getExecution().isChangelogMode()); + } + + public void storeResult(String resultId, DynamicResult result) { + results.put(resultId, result); + } + + public DynamicResult getResult(String resultId) { + return results.get(resultId); + } + + public void removeResult(String resultId) { + final DynamicResult result = results.get(resultId); + result.close(); + results.remove(resultId); + } + + public List getResults() { + return new ArrayList<>(results.keySet()); + } + + // + + private int getGatewayPort(Deployment deploy) { + // try to get address from deployment configuration + return deploy.getGatewayPort(); + } + + private InetAddress getGatewayAddress(Deployment deploy) { + // try to get address from deployment configuration + final String address = deploy.getGatewayAddress(); + + // use manually defined address + if (!address.isEmpty()) { + try { + return InetAddress.getByName(address); + } catch (UnknownHostException e) { + throw new SqlClientException("Invalid gateway address '" + address + "' for result retrieval.", e); + } + } else { + // TODO cache this + // try to get the address by communicating to JobManager + final String jobManagerAddress = flinkConfig.getString(JobManagerOptions.ADDRESS); + final int jobManagerPort = flinkConfig.getInteger(JobManagerOptions.PORT); + if (jobManagerAddress != null && !jobManagerAddress.isEmpty()) { --- End diff -- Will this work in a "real" cluster setup with SqlClient, JM, and TMs running on different machines? If I get the logic right, we need the host name of the client where the socket server is started, because the sink function
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167921047 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +/** + * Strings used for key and values in an environment file. + */ +public final class PropertyStrings { + + private PropertyStrings() { + // private + } + + public static final String EXECUTION = "execution"; + + public static final String EXECUTION_TYPE = "type"; + + public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming"; + + public static final String EXECUTION_TYPE_VALUE_BATCH = "batch"; + + public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention"; + + public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention"; + + public static final String EXECUTION_PARALLELISM = "parallelism"; + + public static final String EXECUTION_MAX_PARALLELISM = "max-parallelism"; + + public static final String EXECUTION_RESULT_MODE = "result-mode"; + + public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = "changelog"; + + public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table"; + + public static final String DEPLOYMENT = "deployment"; --- End diff -- rename to flink cluster? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167871058 --- Diff: flink-libraries/flink-sql-client/pom.xml --- @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-libraries + 1.5-SNAPSHOT + .. + + + flink-sql-client + flink-sql-client + + jar + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink --- End diff -- indention is off ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167953650 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A result of a dynamic table program. + */ +public class DynamicResult { + + private final boolean isChangelog; + private final CollectTableSink collectTableSink; + private final Object resultLock = new Object(); + private final ResultRetrievalThread retrievalThread; + private final JobMonitoringThread monitoringThread; + private Runnable program; + private SqlExecutionException executionException; + + // for table materialization + private final List materializedTable; + private final MaprowPositions; // positions of rows in table for faster access + private final List snapshot; + private int pageSize; + private boolean isLastSnapshot; + + // for changelog + private Tuple2
nextChangeRecord; + + public DynamicResult(InetAddress targetAddress, int targetPort, boolean isChangelog) { + this.isChangelog = isChangelog; + + // create table sink + collectTableSink = new CollectTableSink(targetAddress, targetPort); + retrievalThread = new ResultRetrievalThread(); + monitoringThread = new JobMonitoringThread(); + + // prepare for table materialization + materializedTable = new ArrayList<>(); + rowPositions = new HashMap<>(); + snapshot = new ArrayList<>(); + isLastSnapshot = false; + } + + public boolean isChangelog() { + return isChangelog; + } + + public void startRetrieval(Runnable program) { + // start listener thread + retrievalThread.start(); + + // start program + this.program = program; + monitoringThread.start(); + } + + public TypedResult > retrieveRecord() { + synchronized (resultLock) { + // retrieval thread is alive return a record if available + // but the program must not have failed + if (retrievalThread.isRunning && executionException == null) { + if (nextChangeRecord == null) { + return TypedResult.empty(); + } else { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + resultLock.notify(); + return TypedResult.payload(change); + } + } + // retrieval thread is dead but there is still a record to be delivered + else if (!retrievalThread.isRunning && nextChangeRecord != null) { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + return TypedResult.payload(change); + } + // no results can be returned anymore + else { + return handleMissingResult(); + } + } + } + + public TypedResult snapshot(int pageSize)
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167921296 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +/** + * Strings used for key and values in an environment file. + */ +public final class PropertyStrings { + + private PropertyStrings() { + // private + } + + public static final String EXECUTION = "execution"; + + public static final String EXECUTION_TYPE = "type"; + + public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming"; + + public static final String EXECUTION_TYPE_VALUE_BATCH = "batch"; + + public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention"; + + public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention"; + + public static final String EXECUTION_PARALLELISM = "parallelism"; + + public static final String EXECUTION_MAX_PARALLELISM = "max-parallelism"; + + public static final String EXECUTION_RESULT_MODE = "result-mode"; + + public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = "changelog"; + + public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table"; + + public static final String DEPLOYMENT = "deployment"; + + public static final String DEPLOYMENT_TYPE = "type"; + + public static final String DEPLOYMENT_TYPE_VALUE_STANDALONE = "standalone"; + + public static final String DEPLOYMENT_RESPONSE_TIMEOUT = "response-timeout"; + + public static final String DEPLOYMENT_GATEWAY_ADDRESS = "client-address"; --- End diff -- rename to `gateway-address` or `flink-gateway-address`? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167912319 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List jars, List libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration +
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167912662 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List jars, List libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration +
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167952297 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A result of a dynamic table program. + */ +public class DynamicResult { + + private final boolean isChangelog; + private final CollectTableSink collectTableSink; + private final Object resultLock = new Object(); + private final ResultRetrievalThread retrievalThread; + private final JobMonitoringThread monitoringThread; + private Runnable program; + private SqlExecutionException executionException; + + // for table materialization + private final List materializedTable; + private final MaprowPositions; // positions of rows in table for faster access + private final List snapshot; + private int pageSize; + private boolean isLastSnapshot; + + // for changelog + private Tuple2
nextChangeRecord; + + public DynamicResult(InetAddress targetAddress, int targetPort, boolean isChangelog) { + this.isChangelog = isChangelog; + + // create table sink + collectTableSink = new CollectTableSink(targetAddress, targetPort); + retrievalThread = new ResultRetrievalThread(); + monitoringThread = new JobMonitoringThread(); + + // prepare for table materialization + materializedTable = new ArrayList<>(); + rowPositions = new HashMap<>(); + snapshot = new ArrayList<>(); + isLastSnapshot = false; + } + + public boolean isChangelog() { + return isChangelog; + } + + public void startRetrieval(Runnable program) { + // start listener thread + retrievalThread.start(); + + // start program + this.program = program; + monitoringThread.start(); + } + + public TypedResult > retrieveRecord() { + synchronized (resultLock) { + // retrieval thread is alive return a record if available + // but the program must not have failed + if (retrievalThread.isRunning && executionException == null) { + if (nextChangeRecord == null) { + return TypedResult.empty(); + } else { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + resultLock.notify(); + return TypedResult.payload(change); + } + } + // retrieval thread is dead but there is still a record to be delivered + else if (!retrievalThread.isRunning && nextChangeRecord != null) { + final Tuple2 change = nextChangeRecord; + nextChangeRecord = null; + return TypedResult.payload(change); + } + // no results can be returned anymore + else { + return handleMissingResult(); + } + } + } + + public TypedResult snapshot(int pageSize)
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167898913 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { --- End diff -- Move the code of the local executor into a separate package. This will also help to validate that the abstractions are correctly designed. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167936060 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +import org.apache.flink.table.client.SqlClientException; + +import java.io.IOException; +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Environment configuration. --- End diff -- Is Environment the same as a session? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167881723 --- Diff: flink-libraries/flink-sql-client/pom.xml --- @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-libraries + 1.5-SNAPSHOT + .. + + + flink-sql-client + flink-sql-client --- End diff -- should we call the module `flink-sql-server`? In the future, the functionality will be split into server and client, and I assume most of the functionality will reside in the server. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167921412 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.config; + +/** + * Strings used for key and values in an environment file. + */ +public final class PropertyStrings { + + private PropertyStrings() { + // private + } + + public static final String EXECUTION = "execution"; + + public static final String EXECUTION_TYPE = "type"; + + public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming"; + + public static final String EXECUTION_TYPE_VALUE_BATCH = "batch"; + + public static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention"; + + public static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention"; + + public static final String EXECUTION_PARALLELISM = "parallelism"; + + public static final String EXECUTION_MAX_PARALLELISM = "max-parallelism"; + + public static final String EXECUTION_RESULT_MODE = "result-mode"; + + public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = "changelog"; + + public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table"; + + public static final String DEPLOYMENT = "deployment"; + + public static final String DEPLOYMENT_TYPE = "type"; + + public static final String DEPLOYMENT_TYPE_VALUE_STANDALONE = "standalone"; + + public static final String DEPLOYMENT_RESPONSE_TIMEOUT = "response-timeout"; + + public static final String DEPLOYMENT_GATEWAY_ADDRESS = "client-address"; + + public static final String DEPLOYMENT_GATEWAY_PORT = "client-port"; --- End diff -- rename to `gateway-port` or `flink-gateway-port`? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167947410 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/DynamicResult.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A result of a dynamic table program. + */ +public class DynamicResult { + + private final boolean isChangelog; + private final CollectTableSink collectTableSink; + private final Object resultLock = new Object(); + private final ResultRetrievalThread retrievalThread; + private final JobMonitoringThread monitoringThread; + private Runnable program; + private SqlExecutionException executionException; + + // for table materialization + private final List materializedTable; + private final MaprowPositions; // positions of rows in table for faster access + private final List snapshot; + private int pageSize; + private boolean isLastSnapshot; + + // for changelog + private Tuple2
nextChangeRecord; --- End diff -- Also fetch change rows in a List for more efficient retrieval? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167938697 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List jars, List libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration +
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167902843 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/TypedResult.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +/** + * Result with an attached type (actual payload, EOS, etc.). + * + * @param type of payload + */ +public class TypedResult { + + private final Type type; + + private final P payload; + + private TypedResult(Type type, P payload) { + this.type = type; + this.payload = payload; + } + + public Type getType() { + return type; + } + + public P getPayload() { + return payload; + } + + // + + public static TypedResult empty() { + return new TypedResult<>(Type.EMPTY, null); + } + + public static TypedResult payload(T payload) { + return new TypedResult<>(Type.PAYLOAD, payload); + } + + public static TypedResult endOfStream() { + return new TypedResult<>(Type.EOS, null); + } + + // + + /** +* Result types. +*/ + public enum Type { --- End diff -- Rename to `ResultType` to avoid confusion with `java.lang.reflect.Type`? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167880842 --- Diff: flink-libraries/flink-sql-client/pom.xml --- @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-libraries + 1.5-SNAPSHOT + .. + + + flink-sql-client + flink-sql-client + + jar + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + + flink-clients_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-table_${scala.binary.version} + ${project.version} + + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + log4j + log4j + ${log4j.version} + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + org.jline + jline-terminal + 3.5.1 + + + org.jline + jline-reader + 3.5.1 + + + + + com.fasterxml.jackson.core + jackson-databind + 2.2.3 --- End diff -- the root `pom.xml` defines a global jackson version (2.7.9). Maybe use that one? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167880856 --- Diff: flink-libraries/flink-sql-client/pom.xml --- @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-libraries + 1.5-SNAPSHOT + .. + + + flink-sql-client + flink-sql-client + + jar + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + + flink-clients_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-table_${scala.binary.version} + ${project.version} + + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + log4j + log4j + ${log4j.version} + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + org.jline + jline-terminal + 3.5.1 + + + org.jline + jline-reader + 3.5.1 + + + + + com.fasterxml.jackson.core + jackson-databind + 2.2.3 + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + 2.3.0 --- End diff -- the root `pom.xml` defines a global jackson version (2.7.9). Maybe use that one? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167869733 --- Diff: flink-libraries/flink-sql-client/bin/sql-client.sh --- @@ -0,0 +1,83 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +# Adopted from "flink" bash script + + +target="$0" +# For the case, the executable has been directly symlinked, figure out +# the correct bin path by following its symlink up to an upper bound. +# Note: we can't use the readlink utility here if we want to be POSIX +# compatible. +iteration=0 +while [ -L "$target" ]; do +if [ "$iteration" -gt 100 ]; then +echo "Cannot resolve path: You have a cyclic symlink in $target." +break +fi +ls=`ls -ld -- "$target"` +target=`expr "$ls" : '.* -> \(.*\)$'` +iteration=$((iteration + 1)) +done + +# Convert relative path to absolute path +bin=`dirname "$target"` + +# get flink config +. "$bin"/config.sh + +if [ "$FLINK_IDENT_STRING" = "" ]; then +FLINK_IDENT_STRING="$USER" +fi + +CC_CLASSPATH=`constructFlinkClassPath` + + +# SQL client specific logic + + +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log +log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml) + +export FLINK_ROOT_DIR +export FLINK_CONF_DIR + +# get path of jar in /opt if it exist +FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar") + +# check if SQL client is already in classpath and must not be shipped manually +if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then + +# start client without jar +exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.client.SqlClient "$@" + +# check if SQL client jar is in /opt +elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then + +# start client with jar +exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`" + +# write error message to stderr +else +(>&2 echo "[ERROR] Flink SQL client jar neither found in classpath nor /opt directory should be located in $FLINK_OPT_DIR.") --- End diff -- check error message ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167892881 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/CollectTableSink.java --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.streaming.experimental.SocketStreamIterator; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Iterator; + +/** + * Table sink for collecting the results locally. + */ +public class CollectTableSink implements RetractStreamTableSink, Iterator> { + + private final InetAddress gatewayAddress; + private final int manualGatewayPort; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + private SocketStreamIterator > iterator; + + public CollectTableSink(InetAddress gatewayAddress, int manualGatewayPort) { + this.gatewayAddress = gatewayAddress; + this.manualGatewayPort = manualGatewayPort; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink > configure(String[] fieldNames, TypeInformation[] fieldTypes) { + this.fieldNames = fieldNames; + this.fieldTypes = fieldTypes; + return this; --- End diff -- According to the JavaDocs `TableSink.configure()` must return a configured copy of the `TableSink` and not the `TableSink` object itself. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167882975 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client; + +import org.apache.flink.table.client.cli.CliClient; +import org.apache.flink.table.client.cli.CliOptions; +import org.apache.flink.table.client.cli.CliOptionsParser; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.gateway.Executor; +import org.apache.flink.table.client.gateway.LocalExecutor; +import org.apache.flink.table.client.gateway.SessionContext; + +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * SQL Client for submitting SQL statements. The client can be executed in two + * modes: a gateway and embedded mode. + * + * - In gateway mode, the SQL CLI client connects to the REST API of the gateway and allows for --- End diff -- I don't think we should document features that are not there yet (or at least mark them as future features). ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167909635 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/LocalExecutor.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.BatchQueryConfig; +import org.apache.flink.table.api.QueryConfig; +import org.apache.flink.table.api.StreamQueryConfig; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.config.Deployment; +import org.apache.flink.table.client.config.Environment; +import org.apache.flink.table.client.config.Execution; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Executor that performs the Flink communication locally. The calls are blocking depending on the + * response time to the Flink cluster. Flink jobs are not blocking. + */ +public class LocalExecutor implements Executor { + + private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml"; + + private final Environment environment; + private final List dependencies; + private final Configuration flinkConfig; + private final ResultStore resultStore; + + public LocalExecutor(URL defaultEnv, List jars, List libraries) { + + final String flinkConfigDir; + try { + // find the configuration directory + flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv(); + + // load the global configuration +
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167872946 --- Diff: flink-libraries/flink-sql-client/pom.xml --- @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-libraries + 1.5-SNAPSHOT + .. + + + flink-sql-client + flink-sql-client + + jar + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + + flink-clients_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-table_${scala.binary.version} + ${project.version} + + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + log4j + log4j + ${log4j.version} + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + org.jline + jline-terminal + 3.5.1 --- End diff -- The jLine dependencies are BSD licensed. I think we need to update the the `./LICENSE` file and include this dependency. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167870419 --- Diff: flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml --- @@ -0,0 +1,47 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +#== +# Table Sources +#== +sources: [] # empty list +# Define table sources here. See the documentation for more details. +# - name: ... +# connector: ... +# format: ... +# schema: ... + +#== +# Execution properties +#== +execution: + type: streaming --- End diff -- Add brief explanations for parameters and alternative values for `type` and `result-mode`. ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167871291 --- Diff: flink-libraries/flink-sql-client/pom.xml --- @@ -0,0 +1,149 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-libraries + 1.5-SNAPSHOT + .. + + + flink-sql-client + flink-sql-client + + jar + + + + + + org.apache.flink + flink-core + ${project.version} + + + + org.apache.flink + + flink-clients_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-streaming-scala_${scala.binary.version} + ${project.version} + + + + org.apache.flink + + flink-table_${scala.binary.version} + ${project.version} + + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + --- End diff -- empty line ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5441#discussion_r167869958 --- Diff: flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml --- @@ -0,0 +1,47 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +#== +# Table Sources +#== +sources: [] # empty list +# Define table sources here. See the documentation for more details. --- End diff -- more concrete pointer (Table API & SQL documentation)? ---
[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5441 [FLINK-8607] [table] Add a basic embedded SQL CLI client ## What is the purpose of the change This PR implements the first part of the implementation plan described in FLIP-24. ``` Goal: Add the basic features to play around with Flink's streaming SQL. - Add CLI component that reads the configuration files - "Pre-registered table sources" - "Job parameters" - Add executor for retrieving pre-flight information and corresponding CLI SQL parser - SHOW TABLES - DESCRIBE TABLE - EXPLAIN - Add streaming append query submission to executor - Submit jars and run SELECT query using the ClusterClient - Collect results on heap and serve them on the CLI side (Internal Mode with SELECT) - SOURCE (for executing a SQL statement stored in a local file) ``` Additionally, this PR also supports retraction queries and the SET operation for setting properties. The client can be started using `./bin/sql-client.sh embedded`. A table source must be defined in `./conf/sql-client-defaults.yaml` (for example a CSV table source, an example can be found in the test resources directory). The client supports two modes for viewing results. A `changelog` mode or `table` mode. They can be selected by setting the `execution.result-mode` property. The code is still work in progress. There are a couple of things that can be improved: - Add more logging instead of swallowing exceptions - Use Flink's ConfigOptions where applicable (e.g. for better default value handling and validation) - Maybe make the record result retrieval blocking? - Tests for the LocalExecutor - Some basic tests for other classes (maybe for CLI classes as well?) - More deployment options (support for YARN, FLIP-6?) - Documentation ## Brief change log - New module `flink-sql-client` - New executable script in `flink/bin` - Minor visibility changes in other modules ## Verifying this change Manually verified. Further tests will follow. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8607 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5441.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5441 commit 17b6b6d8b8804382b7b855033c370ffb4fd673ac Author: twalthrDate: 2017-12-07T12:46:31Z [FLINK-8607] [table] Add a basic embedded SQL CLI client ---