[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-24 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1031725624


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java:
##
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.ResultStore;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
+
+/**
+ * Executor that performs the Flink communication remotely. Connection to SQL 
and query execution
+ * are managed by the RestClient.
+ */
+public class RemoteExecutor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
+
+private final RestClient restClient;
+private final ResultStore resultStore;
+private final KeepingAliveThread keepingAliveThread;
+
+private String sessionHandleId;
+private SessionHandle sessionHandle;
+private SessionMessageParameters sessionMessageParametersInstance;
+
+

[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-21 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028528113


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java:
##
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.ResultStore;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
+
+/**
+ * Executor that performs the Flink communication remotely. Connection to SQL 
and query execution
+ * are managed by the RestClient.
+ */
+public class RemoteExecutor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
+
+private final RestClient restClient;
+private final ResultStore resultStore;
+private final KeepingAliveThread keepingAliveThread;
+
+private String sessionHandleId;
+private SessionHandle sessionHandle;
+private SessionMessageParameters sessionMessageParametersInstance;
+
+

[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-21 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028512299


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java:
##
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.ResultStore;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
+
+/**
+ * Executor that performs the Flink communication remotely. Connection to SQL 
and query execution
+ * are managed by the RestClient.
+ */
+public class RemoteExecutor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
+
+private final RestClient restClient;
+private final ResultStore resultStore;
+private final KeepingAliveThread keepingAliveThread;
+
+private String sessionHandleId;
+private SessionHandle sessionHandle;
+private SessionMessageParameters sessionMessageParametersInstance;
+
+

[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-21 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028414670


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/RemoteExecutor.java:
##
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.ResultStore;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.client.gateway.remote.result.TableResultWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
+import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
+
+/**
+ * Executor that performs the Flink communication remotely. Connection to SQL 
and query execution
+ * are managed by the RestClient.
+ */
+public class RemoteExecutor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
+
+private final RestClient restClient;
+private final ResultStore resultStore;
+private final KeepingAliveThread keepingAliveThread;
+
+private String sessionHandleId;
+private SessionHandle sessionHandle;
+private SessionMessageParameters sessionMessageParametersInstance;
+
+

[GitHub] [flink] jnh5y commented on a diff in pull request #20958: [FLINK-29486][sql-client] Add a new RemoteExecutor to send and retrieve messages form remote SQL gateway through rest endpoint

2022-11-21 Thread GitBox


jnh5y commented on code in PR #20958:
URL: https://github.com/apache/flink/pull/20958#discussion_r1028411566


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/remote/result/TableResultWrapper.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.remote.result;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.client.gateway.remote.RemoteExecutor;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** To wrap the result returned by {@link RemoteExecutor#executeStatement}. */
+public class TableResultWrapper implements TableResultInternal {
+
+private final ResolvedSchema resolvedSchema;
+private final CloseableIterator dataIterator;
+
+private String resultId;
+private boolean isMaterialized = false;
+private ReadableConfig config;
+
+public TableResultWrapper(
+RemoteExecutor executor,
+OperationHandle operationHandle,
+ResultSet firstResult,
+Long nextToken) {
+this.resolvedSchema = firstResult.getResultSchema();
+dataIterator =
+new RowDataIterator(executor, operationHandle, 
firstResult.getData(), nextToken);
+}
+
+public void setResultId(String resultId) {
+this.resultId = resultId;
+}
+
+public String getResultId() {
+return resultId;
+}
+
+public void setMaterialized(boolean isMaterialized) {
+this.isMaterialized = isMaterialized;
+}
+
+public boolean isMaterialized() {
+return isMaterialized;
+}
+
+public void setConfig(ReadableConfig config) {
+this.config = config;
+}
+
+public ReadableConfig getConfig() {
+return config;
+}
+
+/** Cannot get job client through SQL Gateway. */
+@Override
+public Optional getJobClient() {
+return Optional.empty();
+}
+
+@Override
+public void await() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void await(long timeout, TimeUnit unit) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public ResolvedSchema getResolvedSchema() {
+return resolvedSchema;
+}
+
+@Override
+public ResultKind getResultKind() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public CloseableIterator collect() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void print() {
+throw new UnsupportedOperationException();
+}
+
+/** Returns an iterator that returns the iterator with the internal row 
data type. */
+@Override
+public CloseableIterator collectInternal() {
+return dataIterator;
+}
+
+@Override
+public RowDataToStringConverter getRowDataToStringConverter() {
+// todo
+return rowData -> new String[] {"FAKE TEST RETURN"};
+}
+
+// 

+
+private static class RowDataIterator implements CloseableIterator 
{
+
+private final RemoteExecutor executor;
+private final OperationHandle operationHandle;
+private Iterator currentData;
+
+private Long nextToken;
+
+public RowDataIterator(
+RemoteExecutor executor,
+OperationHandle operationHandle,
+