[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint
jnh5y commented on code in PR #20958: URL: https://github.com/apache/flink/pull/20958#discussion_r1031725624 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java: ## @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.remote; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.table.client.gateway.local.ResultStore; +import org.apache.flink.table.client.gateway.local.result.ChangelogResult; +import org.apache.flink.table.client.gateway.local.result.DynamicResult; +import org.apache.flink.table.client.gateway.local.result.MaterializedResult; +import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders; +import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders; +import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders; +import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders; +import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody; +import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE; + +/** + * Executor that performs the Flink communication remotely. Connection to SQL and query execution + * are managed by the RestClient. + */ +public class RemoteExecutor { + +private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class); + +private final RestClient restClient; +private final ResultStore resultStore; +private final KeepingAliveThread keepingAliveThread; + +private String sessionHandleId; +private SessionHandle sessionHandle; +private SessionMessageParameters sessionMessageParametersInstance; + +
[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint
jnh5y commented on code in PR #20958: URL: https://github.com/apache/flink/pull/20958#discussion_r1028528113 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java: ## @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.remote; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.table.client.gateway.local.ResultStore; +import org.apache.flink.table.client.gateway.local.result.ChangelogResult; +import org.apache.flink.table.client.gateway.local.result.DynamicResult; +import org.apache.flink.table.client.gateway.local.result.MaterializedResult; +import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders; +import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders; +import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders; +import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders; +import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody; +import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE; + +/** + * Executor that performs the Flink communication remotely. Connection to SQL and query execution + * are managed by the RestClient. + */ +public class RemoteExecutor { + +private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class); + +private final RestClient restClient; +private final ResultStore resultStore; +private final KeepingAliveThread keepingAliveThread; + +private String sessionHandleId; +private SessionHandle sessionHandle; +private SessionMessageParameters sessionMessageParametersInstance; + +
[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint
jnh5y commented on code in PR #20958: URL: https://github.com/apache/flink/pull/20958#discussion_r1028512299 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java: ## @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.remote; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.table.client.gateway.local.ResultStore; +import org.apache.flink.table.client.gateway.local.result.ChangelogResult; +import org.apache.flink.table.client.gateway.local.result.DynamicResult; +import org.apache.flink.table.client.gateway.local.result.MaterializedResult; +import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders; +import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders; +import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders; +import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders; +import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody; +import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE; + +/** + * Executor that performs the Flink communication remotely. Connection to SQL and query execution + * are managed by the RestClient. + */ +public class RemoteExecutor { + +private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class); + +private final RestClient restClient; +private final ResultStore resultStore; +private final KeepingAliveThread keepingAliveThread; + +private String sessionHandleId; +private SessionHandle sessionHandle; +private SessionMessageParameters sessionMessageParametersInstance; + +
[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint
jnh5y commented on code in PR #20958: URL: https://github.com/apache/flink/pull/20958#discussion_r1028414670 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java: ## @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.remote; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.table.client.SqlClientException; +import org.apache.flink.table.client.gateway.SqlExecutionException; +import org.apache.flink.table.client.gateway.TypedResult; +import org.apache.flink.table.client.gateway.local.ResultStore; +import org.apache.flink.table.client.gateway.local.result.ChangelogResult; +import org.apache.flink.table.client.gateway.local.result.DynamicResult; +import org.apache.flink.table.client.gateway.local.result.MaterializedResult; +import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders; +import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders; +import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders; +import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders; +import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody; +import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import static org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE; + +/** + * Executor that performs the Flink communication remotely. Connection to SQL and query execution + * are managed by the RestClient. + */ +public class RemoteExecutor { + +private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class); + +private final RestClient restClient; +private final ResultStore resultStore; +private final KeepingAliveThread keepingAliveThread; + +private String sessionHandleId; +private SessionHandle sessionHandle; +private SessionMessageParameters sessionMessageParametersInstance; + +
[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint
jnh5y commented on code in PR #20958: URL: https://github.com/apache/flink/pull/20958#discussion_r1028411566 ## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/result/TableResultWrapper.java: ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.client.gateway.remote.result; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.client.gateway.remote.RemoteExecutor; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; +import org.apache.flink.table.utils.print.RowDataToStringConverter; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** To wrap the result returned by {@link RemoteExecutor#executeStatement}. */ +public class TableResultWrapper implements TableResultInternal { + +private final ResolvedSchema resolvedSchema; +private final CloseableIterator dataIterator; + +private String resultId; +private boolean isMaterialized = false; +private ReadableConfig config; + +public TableResultWrapper( +RemoteExecutor executor, +OperationHandle operationHandle, +ResultSet firstResult, +Long nextToken) { +this.resolvedSchema = firstResult.getResultSchema(); +dataIterator = +new RowDataIterator(executor, operationHandle, firstResult.getData(), nextToken); +} + +public void setResultId(String resultId) { +this.resultId = resultId; +} + +public String getResultId() { +return resultId; +} + +public void setMaterialized(boolean isMaterialized) { +this.isMaterialized = isMaterialized; +} + +public boolean isMaterialized() { +return isMaterialized; +} + +public void setConfig(ReadableConfig config) { +this.config = config; +} + +public ReadableConfig getConfig() { +return config; +} + +/** Cannot get job client through SQL Gateway. */ +@Override +public Optional getJobClient() { +return Optional.empty(); +} + +@Override +public void await() { +throw new UnsupportedOperationException(); +} + +@Override +public void await(long timeout, TimeUnit unit) { +throw new UnsupportedOperationException(); +} + +@Override +public ResolvedSchema getResolvedSchema() { +return resolvedSchema; +} + +@Override +public ResultKind getResultKind() { +throw new UnsupportedOperationException(); +} + +@Override +public CloseableIterator collect() { +throw new UnsupportedOperationException(); +} + +@Override +public void print() { +throw new UnsupportedOperationException(); +} + +/** Returns an iterator that returns the iterator with the internal row data type. */ +@Override +public CloseableIterator collectInternal() { +return dataIterator; +} + +@Override +public RowDataToStringConverter getRowDataToStringConverter() { +// todo +return rowData -> new String[] {"FAKE TEST RETURN"}; +} + +// + +private static class RowDataIterator implements CloseableIterator { + +private final RemoteExecutor executor; +private final OperationHandle operationHandle; +private Iterator currentData; + +private Long nextToken; + +public RowDataIterator( +RemoteExecutor executor, +OperationHandle operationHandle, +