This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a696de87196 Add FederationResultSetMetaData to implement 
AdvancedFederationExecutor logic (#19712)
a696de87196 is described below

commit a696de87196885f532b07c9a5b5c2b2bc95c8e06
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Sat Jul 30 17:25:29 2022 +0800

    Add FederationResultSetMetaData to implement AdvancedFederationExecutor 
logic (#19712)
---
 .../advanced/AdvancedFederationExecutor.java       |  29 ++--
 .../advanced/resultset/FederationResultSet.java    |  13 +-
 .../resultset/FederationResultSetMetaData.java     | 176 +++++++++++++++++++++
 .../executor/original/schema/FilterableSchema.java |   1 +
 4 files changed, 206 insertions(+), 13 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
index efa13524d4b..6782cd3f8aa 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.federation.executor.advanced;
 
+import com.google.common.base.Preconditions;
 import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.config.CalciteConnectionConfig;
@@ -30,6 +31,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.runtime.Bindable;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.eventbus.EventBusContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -48,6 +51,7 @@ import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerCon
 import 
org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory;
 import 
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.Connection;
@@ -91,22 +95,29 @@ public final class AdvancedFederationExecutor implements 
FederationExecutor {
     @Override
     public ResultSet executeQuery(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                                   final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final FederationContext federationContext) throws 
SQLException {
-        SQLStatement sqlStatement = 
federationContext.getLogicSQL().getSqlStatementContext().getSqlStatement();
-        Enumerator<Object[]> enumerator = execute(sqlStatement, 
federationContext, prepareEngine, callback).enumerator();
-        resultSet = new FederationResultSet(enumerator, 
federationContext.getLogicSQL().getSqlStatementContext());
+        SQLStatementContext<?> sqlStatementContext = 
federationContext.getLogicSQL().getSqlStatementContext();
+        Preconditions.checkArgument(sqlStatementContext instanceof 
SelectStatementContext, "SQL statement context must be select statement 
context.");
+        ShardingSphereSchema schema = 
federationContext.getDatabases().get(databaseName.toLowerCase()).getSchema(schemaName);
+        FilterableSchema filterableSchema = 
createFilterableSchema(prepareEngine, schema, callback, federationContext);
+        Enumerator<Object[]> enumerator = 
execute(sqlStatementContext.getSqlStatement(), filterableSchema).enumerator();
+        resultSet = new FederationResultSet(enumerator, schema, 
filterableSchema, sqlStatementContext);
         return resultSet;
     }
     
-    @SuppressWarnings("unchecked")
-    private Enumerable<Object[]> execute(final SQLStatement sqlStatement, 
final FederationContext federationContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                         final JDBCExecutorCallback<? extends 
ExecuteResult> callback) {
+    private FilterableSchema createFilterableSchema(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final ShardingSphereSchema schema,
+                                                    final 
JDBCExecutorCallback<? extends ExecuteResult> callback, final FederationContext 
federationContext) {
         FilterableTableScanExecutorContext executorContext = new 
FilterableTableScanExecutorContext(databaseName, schemaName, props, 
federationContext);
         FilterableTableScanExecutor executor = new 
FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, 
optimizerContext, globalRuleMetaData, executorContext, eventBusContext);
+        FederationSchemaMetaData schemaMetaData = new 
FederationSchemaMetaData(schemaName, schema.getTables());
+        return new FilterableSchema(schemaMetaData, executor);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private Enumerable<Object[]> execute(final SQLStatement sqlStatement, 
final FilterableSchema filterableSchema) {
+        // TODO remove OptimizerPlannerContextFactory call and use setup 
executor to handle this logic
         CalciteConnectionConfig connectionConfig = new 
CalciteConnectionConfigImpl(OptimizerPlannerContextFactory.createConnectionProperties());
         RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
-        FederationSchemaMetaData schemaMetaData = new 
FederationSchemaMetaData(schemaName, 
federationContext.getDatabases().get(databaseName).getSchema(schemaName).getTables());
-        // TODO remove OptimizerPlannerContextFactory call and use setup 
executor to handle this logic
-        CalciteCatalogReader catalogReader = 
OptimizerPlannerContextFactory.createCatalogReader(schemaName, new 
FilterableSchema(schemaMetaData, executor), relDataTypeFactory, 
connectionConfig);
+        CalciteCatalogReader catalogReader = 
OptimizerPlannerContextFactory.createCatalogReader(schemaName, 
filterableSchema, relDataTypeFactory, connectionConfig);
         SqlValidator validator = 
OptimizerPlannerContextFactory.createValidator(catalogReader, 
relDataTypeFactory, connectionConfig);
         SqlToRelConverter converter = 
OptimizerPlannerContextFactory.createConverter(catalogReader, validator, 
relDataTypeFactory);
         RelNode bestPlan = new ShardingSphereOptimizer(optimizerContext, 
converter).optimize(databaseName, schemaName, sqlStatement);
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
index c1fba3fd068..9cfed743252 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
@@ -17,11 +17,14 @@
 
 package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
 
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.linq4j.Enumerator;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
 import 
org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
+import 
org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 
 import java.io.InputStream;
 import java.io.Reader;
@@ -61,15 +64,18 @@ public final class FederationResultSet extends 
AbstractUnsupportedOperationResul
     
     private final Map<String, Integer> columnLabelAndIndexMap;
     
+    private final FederationResultSetMetaData resultSetMetaData;
+    
     private Object[] currentRows;
     
     private boolean wasNull;
     
     private boolean closed;
     
-    public FederationResultSet(final Enumerator<Object[]> enumerator, final 
SQLStatementContext<?> sqlStatementContext) {
+    public FederationResultSet(final Enumerator<Object[]> enumerator, final 
ShardingSphereSchema schema, final FilterableSchema filterableSchema, final 
SQLStatementContext<?> sqlStatementContext) {
         this.enumerator = enumerator;
         columnLabelAndIndexMap = 
createColumnLabelAndIndexMap(sqlStatementContext);
+        resultSetMetaData = new FederationResultSetMetaData(schema, 
filterableSchema, new JavaTypeFactoryImpl(), (SelectStatementContext) 
sqlStatementContext);
     }
     
     private Map<String, Integer> createColumnLabelAndIndexMap(final 
SQLStatementContext<?> sqlStatementContext) {
@@ -312,8 +318,7 @@ public final class FederationResultSet extends 
AbstractUnsupportedOperationResul
     
     @Override
     public ResultSetMetaData getMetaData() throws SQLException {
-        // TODO implement getMetaData for federation resultset
-        return null;
+        return resultSetMetaData;
     }
     
     @Override
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
new file mode 100644
index 00000000000..fbd2f44880d
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Table;
+import 
org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
+import 
org.apache.shardingsphere.infra.binder.segment.select.projection.impl.ColumnProjection;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import 
org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Federation result set meta data.
+ */
+@RequiredArgsConstructor
+public final class FederationResultSetMetaData extends WrapperAdapter 
implements ResultSetMetaData {
+    
+    private final ShardingSphereSchema schema;
+    
+    private final FilterableSchema filterableSchema;
+    
+    private final RelDataTypeFactory relDataTypeFactory;
+    
+    private final SelectStatementContext selectStatementContext;
+    
+    @Override
+    public int getColumnCount() throws SQLException {
+        return 
selectStatementContext.getProjectionsContext().getExpandProjections().size();
+    }
+    
+    @Override
+    public boolean isAutoIncrement(final int column) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public boolean isCaseSensitive(final int column) {
+        return true;
+    }
+    
+    @Override
+    public boolean isSearchable(final int column) {
+        return false;
+    }
+    
+    @Override
+    public boolean isCurrency(final int column) {
+        return false;
+    }
+    
+    @Override
+    public int isNullable(final int column) {
+        Optional<Table> table = findTableName(column).flatMap(optional -> 
Optional.ofNullable(filterableSchema.getTable(optional)));
+        return !table.isPresent() || 
table.get().getRowType(relDataTypeFactory).isNullable() ? 
ResultSetMetaData.columnNullable : ResultSetMetaData.columnNoNulls;
+    }
+    
+    @Override
+    public boolean isSigned(final int column) throws SQLException {
+        return true;
+    }
+    
+    @Override
+    public int getColumnDisplaySize(final int column) {
+        return findTableName(column).flatMap(optional -> 
Optional.ofNullable(filterableSchema.getTable(optional))).map(optional -> 
optional.getRowType(relDataTypeFactory).getPrecision()).orElse(0);
+    }
+    
+    @Override
+    public String getColumnLabel(final int column) throws SQLException {
+        Projection projection = 
selectStatementContext.getProjectionsContext().getExpandProjections().get(column
 - 1);
+        if (projection instanceof ColumnProjection) {
+            return ((ColumnProjection) projection).getName();
+        }
+        return projection.getColumnLabel();
+    }
+    
+    @Override
+    public String getColumnName(final int column) throws SQLException {
+        Projection projection = 
selectStatementContext.getProjectionsContext().getExpandProjections().get(column
 - 1);
+        if (projection instanceof ColumnProjection) {
+            return ((ColumnProjection) projection).getName();
+        }
+        return projection.getColumnLabel();
+    }
+    
+    @Override
+    public String getSchemaName(final int column) {
+        return DefaultDatabase.LOGIC_NAME;
+    }
+    
+    @Override
+    public int getPrecision(final int column) {
+        Optional<Table> table = findTableName(column).flatMap(optional -> 
Optional.ofNullable(filterableSchema.getTable(optional)));
+        return !table.isPresent() || RelDataType.PRECISION_NOT_SPECIFIED == 
table.get().getRowType(relDataTypeFactory).getPrecision() ? 0 : 
table.get().getRowType(relDataTypeFactory).getPrecision();
+    }
+    
+    @Override
+    public int getScale(final int column) {
+        Optional<Table> table = findTableName(column).flatMap(optional -> 
Optional.ofNullable(filterableSchema.getTable(optional)));
+        return !table.isPresent() || RelDataType.SCALE_NOT_SPECIFIED == 
table.get().getRowType(relDataTypeFactory).getScale() ? 0 : 
table.get().getRowType(relDataTypeFactory).getScale();
+    }
+    
+    @Override
+    public String getTableName(final int column) throws SQLException {
+        return findTableName(column).orElse("");
+    }
+    
+    @Override
+    public String getCatalogName(final int column) {
+        return DefaultDatabase.LOGIC_NAME;
+    }
+    
+    @Override
+    public int getColumnType(final int column) throws SQLException {
+        return 0;
+    }
+    
+    @Override
+    public String getColumnTypeName(final int column) throws SQLException {
+        return "";
+    }
+    
+    @Override
+    public boolean isReadOnly(final int column) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public boolean isWritable(final int column) {
+        return false;
+    }
+    
+    @Override
+    public boolean isDefinitelyWritable(final int column) {
+        return false;
+    }
+    
+    @Override
+    public String getColumnClassName(final int column) {
+        return "";
+    }
+    
+    private Optional<String> findTableName(final int column) {
+        Projection projection = 
selectStatementContext.getProjectionsContext().getExpandProjections().get(column
 - 1);
+        if (projection instanceof ColumnProjection) {
+            Map<String, String> tableNamesByColumnProjection =
+                    
selectStatementContext.getTablesContext().findTableNamesByColumnProjection(Collections.singletonList((ColumnProjection)
 projection), schema);
+            return 
Optional.of(tableNamesByColumnProjection.get(projection.getExpression()));
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
index 318823b777b..ca851972870 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
@@ -47,6 +47,7 @@ public final class FilterableSchema extends AbstractSchema {
     private Map<String, Table> createTableMap(final FederationSchemaMetaData 
schemaMetaData, final FilterableTableScanExecutor executor) {
         Map<String, Table> result = new 
LinkedHashMap<>(schemaMetaData.getTables().size(), 1);
         for (FederationTableMetaData each : 
schemaMetaData.getTables().values()) {
+            // TODO implement table statistic logic after using custom 
operators
             result.put(each.getName(), new FilterableTable(each, executor, new 
FederationTableStatistic()));
         }
         return result;

Reply via email to