yuqi1129 commented on a change in pull request #902: URL: https://github.com/apache/incubator-iotdb/pull/902#discussion_r487632569
########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBEnumerator.java ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; + +public class IoTDBEnumerator implements Enumerator<Object> { + + private ResultSet currentResultSet; + private Iterator<ResultSet> iterator; + private List<Integer> indexInResultSet = new ArrayList<>(); + private List<RelDataTypeField> fieldTypes; + + /** + * Creates a IoTDBEnumerator. + * + * @param resultList IoTDB result set + * @param protoRowType The type of protecting rows + */ + IoTDBEnumerator(List<ResultSet> resultList, RelProtoDataType protoRowType) throws SQLException { + this.iterator = resultList.iterator(); + if (iterator.hasNext()) { + this.currentResultSet = iterator.next(); + } + + final RelDataTypeFactory typeFactory = + new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + this.fieldTypes = protoRowType.apply(typeFactory).getFieldList(); + + // get the corresponding index of project columns in result set + // as we have 'time' and 'device' columns in result that the project columns may don't include + // e.g. If resultSet includes [time, device, s0, s1], project columns only include [device, s0, s1], + // then the indexInResultSet is [2, 3, 4]. + ResultSetMetaData metaData = currentResultSet.getMetaData(); + int indexInFieldTypes = 0; + for (int i = 1; i <= metaData.getColumnCount(); i++) { + if (i <= 2 && !metaData.getColumnName(i).toLowerCase() + .equals(fieldTypes.get(indexInFieldTypes).getName())) { + continue; + } else { + indexInFieldTypes++; + indexInResultSet.add(i); + } + } + } + + /** + * Produce and get the next row from the results + * + * @return A new row from the results + */ + @Override + public Object current() { + if (fieldTypes.size() == 1) { + // If we just have one field, produce it directly + return currentRowField(indexInResultSet.get(0), fieldTypes.get(0).getType().getSqlTypeName()); + } else { + // Build an array with all fields in this row + Object[] row = new Object[fieldTypes.size()]; + for (int i = 0; i < fieldTypes.size(); i++) { + row[i] = currentRowField(indexInResultSet.get(i), + fieldTypes.get(i).getType().getSqlTypeName()); + } + + return row; + } + } + + /** + * Get a field for the current row from the underlying object. + * + * @param index Index of the field within the Row object + * @param type Type of the field in this row + */ + private Object currentRowField(int index, SqlTypeName type) { + try { + switch (type) { + case VARCHAR: + return currentResultSet.getString(index); + case INTEGER: + return currentResultSet.getInt(index); + case BIGINT: + return currentResultSet.getLong(index); + case DOUBLE: + return currentResultSet.getDouble(index); + case REAL: + return currentResultSet.getFloat(index); + case BOOLEAN: + return currentResultSet.getBoolean(index); + default: + return null; + } + } catch (SQLException e) { + if (e.getMessage().endsWith("NULL.")) { + return null; + } else { + e.printStackTrace(); Review comment: use log ? ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBTable.java ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import com.google.common.collect.ImmutableList; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.calcite.adapter.java.AbstractQueryableTable; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.function.Function1; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeImpl; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTableQueryable; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Util; +import org.apache.iotdb.db.exception.query.QueryProcessException; + +public class IoTDBTable extends AbstractQueryableTable + implements TranslatableTable { + + RelProtoDataType protoRowType; + private final IoTDBSchema schema; + private final String storageGroup; + + public IoTDBTable(IoTDBSchema schema, String storageGroup) { + super(Object[].class); + this.schema = schema; + this.storageGroup = storageGroup; + } + + public String toString() { + return "IoTDBTable {" + storageGroup + "}"; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + try { + if (protoRowType == null) { + protoRowType = schema.getRelDataType(storageGroup); + } + } catch (SQLException | QueryProcessException e) { + // print exception error here + e.printStackTrace(); + } + return protoRowType.apply(typeFactory); + } + + @Override + public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, + String tableName) { + return new IoTDBQueryable<>(queryProvider, schema, this, storageGroup); + } + + @Override + public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { + final RelOptCluster cluster = context.getCluster(); + return new IoTDBTableScan(cluster, cluster.traitSetOf(IoTDBRel.CONVENTION), + relOptTable, this, null); + } + + public Enumerable<Object> query(final Connection connection) { + return query(connection, ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), 0, 0); + } + + /** + * Executes a IoTDB SQL query. + * + * @param connection IoTDB connection + * @param fields List of fields to project + * @return Enumerator of results + */ + public Enumerable<Object> query(final Connection connection, + List<Map.Entry<String, Class>> fields, + final List<String> selectFields, final List<Map.Entry<String, String>> deviceToFilterList, + List<String> globalPredicates, final Integer limit, final Integer offset) { + // Build the type of the resulting row based on the provided fields + final RelDataTypeFactory typeFactory = + new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder(); + final RelDataType rowType = getRowType(typeFactory); + + Function1<String, Void> addField = fieldName -> { + SqlTypeName typeName = + rowType.getField(fieldName, true, false).getType().getSqlTypeName(); + fieldInfo.add(fieldName, typeFactory.createSqlType(typeName)) + .nullable(true); + return null; + }; + + if (selectFields.isEmpty()) { + for (Map.Entry<String, Class> field : fields) { + addField.apply(field.getKey()); + } + } else { + for (String field : selectFields) { + if (field.startsWith("'")) { + fieldInfo.add(field, SqlTypeName.VARCHAR); + } else { + addField.apply(field); + } + } + } + + final RelProtoDataType resultRowType = RelDataTypeImpl.proto(fieldInfo.build()); + + // Construct the list of fields to project + String selectString = ""; + if (selectFields.isEmpty()) { + selectString = "*"; + } else { + // delete the 'time', 'device' string in select list + // this has to be here rather than init "selectFields" otherwise the resultRowType will be wrong + StringBuilder selectBuilder = new StringBuilder(); + for (int i = 0; i < selectFields.size(); i++) { + String selectField = selectFields.get(i); + if (selectField.equals(IoTDBConstant.DeviceColumn) || selectField + .equals(IoTDBConstant.TimeColumn)) { + continue; + } else { + selectBuilder.append(selectField); + if (i < selectFields.size() - 1) { + selectBuilder.append(", "); + } + } + } + selectString = selectBuilder.toString(); + if (selectString.equals("")) { + selectString = "*"; + } + } + + List<String> queryList = new ArrayList<>(); + Set<String> tmpDevices = new HashSet<>(); // to deduplicate in global query + // construct query by device + if (!deviceToFilterList.isEmpty()) { + for (Entry<String, String> deviceToFilter : deviceToFilterList) { + String fromClause = " FROM "; + fromClause += deviceToFilter.getKey(); + tmpDevices.add(deviceToFilter.getKey()); + + String whereClause = ""; + if (deviceToFilter.getValue() != null) { + whereClause = " WHERE "; + whereClause += deviceToFilter.getValue(); + } + + // Build and issue the query and return an Enumerator over the results + StringBuilder queryBuilder = new StringBuilder("SELECT "); + queryBuilder.append(selectString); + queryBuilder.append(fromClause); + queryBuilder.append(whereClause); + + if (limit > 0) { + queryBuilder.append(" LIMIT " + limit); + } + if (offset > 0) { + queryBuilder.append(" OFFSET " + offset); + } + + // append align by device + queryBuilder.append(" " + IoTDBConstant.AlignByDevice); + queryList.add(queryBuilder.toString()); + } + } + + // construct global query + if (deviceToFilterList.isEmpty() || !globalPredicates.isEmpty()) { + String fromClause = " FROM "; + // deduplicate redundant device + if (!deviceToFilterList.isEmpty() && !globalPredicates.isEmpty()) { Review comment: `!globalPredicates.isEmpty()` always true when comes here ########## File path: calcite/pom.xml ########## @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>iotdb-parent</artifactId> + <groupId>org.apache.iotdb</groupId> + <version>0.10.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>iotdb-calcite</artifactId> + <version>0.10.0-SNAPSHOT</version> + <packaging>jar</packaging> + <description>IoTDB adapter for Calcite</description> + <dependencies> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <version>1.21.0</version> Review comment: better to use place holder like ${xxx.versionn} than direct constant value ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBConstant.java ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +public class IoTDBConstant { + + public static final String AlignByDevice = "align by device"; Review comment: i advice to use upper name to represent `static final` value ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBTable.java ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import com.google.common.collect.ImmutableList; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.calcite.adapter.java.AbstractQueryableTable; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.function.Function1; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeImpl; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTableQueryable; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Util; +import org.apache.iotdb.db.exception.query.QueryProcessException; + +public class IoTDBTable extends AbstractQueryableTable + implements TranslatableTable { + + RelProtoDataType protoRowType; + private final IoTDBSchema schema; + private final String storageGroup; + + public IoTDBTable(IoTDBSchema schema, String storageGroup) { + super(Object[].class); + this.schema = schema; + this.storageGroup = storageGroup; + } + + public String toString() { + return "IoTDBTable {" + storageGroup + "}"; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + try { + if (protoRowType == null) { + protoRowType = schema.getRelDataType(storageGroup); + } + } catch (SQLException | QueryProcessException e) { + // print exception error here + e.printStackTrace(); + } + return protoRowType.apply(typeFactory); + } + + @Override + public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, + String tableName) { + return new IoTDBQueryable<>(queryProvider, schema, this, storageGroup); + } + + @Override + public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { + final RelOptCluster cluster = context.getCluster(); + return new IoTDBTableScan(cluster, cluster.traitSetOf(IoTDBRel.CONVENTION), + relOptTable, this, null); + } + + public Enumerable<Object> query(final Connection connection) { + return query(connection, ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), 0, 0); + } + + /** + * Executes a IoTDB SQL query. + * + * @param connection IoTDB connection + * @param fields List of fields to project + * @return Enumerator of results + */ + public Enumerable<Object> query(final Connection connection, + List<Map.Entry<String, Class>> fields, + final List<String> selectFields, final List<Map.Entry<String, String>> deviceToFilterList, + List<String> globalPredicates, final Integer limit, final Integer offset) { + // Build the type of the resulting row based on the provided fields + final RelDataTypeFactory typeFactory = + new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder(); + final RelDataType rowType = getRowType(typeFactory); + + Function1<String, Void> addField = fieldName -> { + SqlTypeName typeName = + rowType.getField(fieldName, true, false).getType().getSqlTypeName(); + fieldInfo.add(fieldName, typeFactory.createSqlType(typeName)) + .nullable(true); + return null; + }; + + if (selectFields.isEmpty()) { + for (Map.Entry<String, Class> field : fields) { + addField.apply(field.getKey()); + } + } else { + for (String field : selectFields) { + if (field.startsWith("'")) { + fieldInfo.add(field, SqlTypeName.VARCHAR); + } else { + addField.apply(field); + } + } + } + + final RelProtoDataType resultRowType = RelDataTypeImpl.proto(fieldInfo.build()); + + // Construct the list of fields to project + String selectString = ""; + if (selectFields.isEmpty()) { + selectString = "*"; + } else { + // delete the 'time', 'device' string in select list + // this has to be here rather than init "selectFields" otherwise the resultRowType will be wrong + StringBuilder selectBuilder = new StringBuilder(); + for (int i = 0; i < selectFields.size(); i++) { + String selectField = selectFields.get(i); + if (selectField.equals(IoTDBConstant.DeviceColumn) || selectField + .equals(IoTDBConstant.TimeColumn)) { + continue; Review comment: Unnecessary ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBEnumerator.java ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; + +public class IoTDBEnumerator implements Enumerator<Object> { + + private ResultSet currentResultSet; + private Iterator<ResultSet> iterator; + private List<Integer> indexInResultSet = new ArrayList<>(); + private List<RelDataTypeField> fieldTypes; + + /** + * Creates a IoTDBEnumerator. + * + * @param resultList IoTDB result set + * @param protoRowType The type of protecting rows + */ + IoTDBEnumerator(List<ResultSet> resultList, RelProtoDataType protoRowType) throws SQLException { + this.iterator = resultList.iterator(); + if (iterator.hasNext()) { + this.currentResultSet = iterator.next(); + } + + final RelDataTypeFactory typeFactory = + new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + this.fieldTypes = protoRowType.apply(typeFactory).getFieldList(); + + // get the corresponding index of project columns in result set + // as we have 'time' and 'device' columns in result that the project columns may don't include + // e.g. If resultSet includes [time, device, s0, s1], project columns only include [device, s0, s1], + // then the indexInResultSet is [2, 3, 4]. + ResultSetMetaData metaData = currentResultSet.getMetaData(); + int indexInFieldTypes = 0; + for (int i = 1; i <= metaData.getColumnCount(); i++) { + if (i <= 2 && !metaData.getColumnName(i).toLowerCase() + .equals(fieldTypes.get(indexInFieldTypes).getName())) { + continue; Review comment: Unnecessary `continue` ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBTable.java ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import com.google.common.collect.ImmutableList; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.calcite.adapter.java.AbstractQueryableTable; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.function.Function1; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeImpl; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTableQueryable; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Util; +import org.apache.iotdb.db.exception.query.QueryProcessException; + +public class IoTDBTable extends AbstractQueryableTable + implements TranslatableTable { + + RelProtoDataType protoRowType; + private final IoTDBSchema schema; + private final String storageGroup; + + public IoTDBTable(IoTDBSchema schema, String storageGroup) { + super(Object[].class); + this.schema = schema; + this.storageGroup = storageGroup; + } + + public String toString() { + return "IoTDBTable {" + storageGroup + "}"; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + try { + if (protoRowType == null) { + protoRowType = schema.getRelDataType(storageGroup); + } + } catch (SQLException | QueryProcessException e) { + // print exception error here + e.printStackTrace(); + } + return protoRowType.apply(typeFactory); + } + + @Override + public <T> Queryable<T> asQueryable(QueryProvider queryProvider, SchemaPlus schema, + String tableName) { + return new IoTDBQueryable<>(queryProvider, schema, this, storageGroup); + } + + @Override + public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { + final RelOptCluster cluster = context.getCluster(); + return new IoTDBTableScan(cluster, cluster.traitSetOf(IoTDBRel.CONVENTION), + relOptTable, this, null); + } + + public Enumerable<Object> query(final Connection connection) { + return query(connection, ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), 0, 0); Review comment: Why limit and offset are 0? ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBRel.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; + +public interface IoTDBRel extends RelNode { + + void implement(Implementor implementor); + + /** + * Calling convention for relational operations that occur in IoTDB. + */ + Convention CONVENTION = new Convention.Impl("IOTDB", IoTDBRel.class); + + /** + * Callback for the implementation process that converts a tree of {@link IoTDBRel} nodes into a + * IoTDB SQL query. + */ + class Implementor { + + final List<String> selectFields = new ArrayList<>(); + final Map<String, String> deviceToFilterMap = new LinkedHashMap<>(); + final List<String> globalPredicate = new ArrayList<>(); + int limit = 0; + int offset = 0; + + RelOptTable table; + IoTDBTable ioTDBTable; + + /** + * Adds newly projected fields. + * + * @param fields New fields to be projected from a query + */ + public void addFields(List<String> fields) { + if (selectFields != null) { + selectFields.addAll(fields); + } + } + + /** + * Adds newly restricted devices and predicates. + * + * @param deviceToFilterMap predicate of given device + * @param predicates global predicates to be applied to the query + */ + public void add(Map<String, String> deviceToFilterMap, List<String> predicates) { + if (this.deviceToFilterMap != null) { Review comment: As `deviceToFilterMap ` and `globalPredicate` are marked as `final`, there is not need to make null check ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBFilter.java ########## @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Util; +import org.apache.iotdb.db.exception.query.LogicalOptimizeException; +import org.apache.iotdb.db.qp.constant.SQLConstant; +import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator; +import org.apache.iotdb.db.qp.logical.crud.FilterOperator; +import org.apache.iotdb.db.qp.strategy.optimizer.DnfFilterOptimizer; +import org.apache.iotdb.tsfile.read.common.Path; + +public class IoTDBFilter extends Filter implements IoTDBRel { + + private final FilterOperator filterOperator; + private final List<String> fieldNames; + private List<String> predicates; // for global predicate + Map<String, String> deviceToFilterMap = new LinkedHashMap<>(); // for device predicate + + protected IoTDBFilter(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) + throws LogicalOptimizeException { + super(cluster, traits, child, condition); + this.fieldNames = IoTDBRules.IoTDBFieldNames(getRowType()); + this.filterOperator = getIoTDBOperator(condition); + this.predicates = translateWhere(filterOperator); + + // add global predicate to each device if both global and device predicate exist + if (!this.predicates.isEmpty() && !this.deviceToFilterMap.isEmpty()) { + for (String device : deviceToFilterMap.keySet()) { + StringBuilder builder = new StringBuilder(this.deviceToFilterMap.get(device)); + builder.append(Util.toString(predicates, " OR ", " OR ", "")); + this.deviceToFilterMap.put(device, builder.toString()); + } + } + assert getConvention() == IoTDBRel.CONVENTION; + assert getConvention() == child.getConvention(); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(0.1); + } + + @Override + public Filter copy(RelTraitSet relTraitSet, RelNode input, RexNode condition) { + try { + return new IoTDBFilter(getCluster(), traitSet, input, condition); + } catch (LogicalOptimizeException e) { + throw new AssertionError(e.getMessage()); + } + } + + @Override + public void implement(Implementor implementor) { + implementor.visitChild(0, getInput()); + implementor.add(deviceToFilterMap, predicates); + } + + /** + * Translates {@link RexNode} expressions into IoTDB filter operator. + */ + private FilterOperator getIoTDBOperator(RexNode filter) { + switch (filter.getKind()) { + case EQUALS: + return getBasicOperator(SQLConstant.EQUAL, (RexCall) filter); + case NOT_EQUALS: + return getBasicOperator(SQLConstant.NOTEQUAL, (RexCall) filter); + case GREATER_THAN: + return getBasicOperator(SQLConstant.GREATERTHAN, (RexCall) filter); + case GREATER_THAN_OR_EQUAL: + return getBasicOperator(SQLConstant.GREATERTHANOREQUALTO, (RexCall) filter); + case LESS_THAN: + return getBasicOperator(SQLConstant.LESSTHAN, (RexCall) filter); + case LESS_THAN_OR_EQUAL: + return getBasicOperator(SQLConstant.LESSTHANOREQUALTO, (RexCall) filter); + case AND: + return getBinaryOperator(SQLConstant.KW_AND, ((RexCall) filter).getOperands()); + case OR: + return getBinaryOperator(SQLConstant.KW_OR, ((RexCall) filter).getOperands()); + default: + throw new AssertionError("cannot get IoTDBOperator from " + filter); Review comment: why just throws error not exception ? ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBSchema.java ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import com.google.common.collect.ImmutableMap; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeImpl; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.jdbc.Config; + +public class IoTDBSchema extends AbstractSchema { + + final Connection connection; + private Map<String, Table> tableMap; + private final SchemaPlus parentSchema; + final String name; + public static Map<String, List<String>> sgToDeviceMap = new HashMap<>(); + + /** + * Creates a IoTDB schema. + * + * @param host IoTDB host, e.g. "localhost" + * @param port IoTDB port, e.g. 6667 + * @param username IoTDB username + * @param password IoTDB password + */ + public IoTDBSchema(String host, int port, String username, String password, + SchemaPlus parentSchema, String name) { + super(); + try { + Class.forName(Config.JDBC_DRIVER_NAME); + this.connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", username, password); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.parentSchema = parentSchema; + this.name = name; + } + + /** + * Generate the columns' names and data types in the given table. + * @param storageGroup the table name + * @return the columns' names and data types + */ + RelProtoDataType getRelDataType(String storageGroup) throws SQLException, QueryProcessException { + final RelDataTypeFactory typeFactory = + new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder(); + + // add time, device columns in relational table + fieldInfo.add(IoTDBConstant.TimeColumn, + typeFactory.createSqlType(IoTDBFieldType.INT64.getSqlType())); + fieldInfo.add(IoTDBConstant.DeviceColumn, + typeFactory.createSqlType(IoTDBFieldType.TEXT.getSqlType())); + + // get devices in this storage group + Statement statement = connection.createStatement(); + boolean hasDevices = statement.execute("show devices " + storageGroup); + if (hasDevices) { + ResultSet devices = statement.getResultSet(); + List<String> deviceList = new ArrayList<>(); + while (devices.next()) { + deviceList.add(devices.getString(1)); + } + this.sgToDeviceMap.put(storageGroup, deviceList); + } + + // get deduplicated measurements in table + boolean hasTS = statement.execute("show timeseries " + storageGroup); + if (hasTS) { + ResultSet timeseries = statement.getResultSet(); + Map<String, IoTDBFieldType> tmpMeasurementMap = new HashMap<>(); + while (timeseries.next()) { + String sensorName = timeseries.getString(1); + sensorName = sensorName.substring(sensorName.lastIndexOf('.') + 1); + IoTDBFieldType sensorType = IoTDBFieldType.of(timeseries.getString(4)); + + if (!tmpMeasurementMap.containsKey(sensorName)) { + tmpMeasurementMap.put(sensorName, sensorType); + fieldInfo.add(sensorName, typeFactory.createSqlType(sensorType.getSqlType())); + } else { + if (!tmpMeasurementMap.get(sensorName).equals(sensorType)) { + throw new QueryProcessException( + "The data types of the same measurement column should be the same across " + + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the " + + "SQL document."); + } + } + } + } + + return RelDataTypeImpl.proto(fieldInfo.build()); + } + + /** + * Generate a map whose key is table name and value is table instance. The implementations of + * AbstractSchema.getTableNames() and AbstractSchema.getTable(String) depend on this map. + */ + @Override + protected Map<String, Table> getTableMap() { + try { + if (tableMap == null) { + tableMap = createTableMap(); + } + } catch (SQLException e) { + e.printStackTrace(); Review comment: log ########## File path: calcite/src/main/java/org/apache/iotdb/calcite/IoTDBFilter.java ########## @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.calcite; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Util; +import org.apache.iotdb.db.exception.query.LogicalOptimizeException; +import org.apache.iotdb.db.qp.constant.SQLConstant; +import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator; +import org.apache.iotdb.db.qp.logical.crud.FilterOperator; +import org.apache.iotdb.db.qp.strategy.optimizer.DnfFilterOptimizer; +import org.apache.iotdb.tsfile.read.common.Path; + +public class IoTDBFilter extends Filter implements IoTDBRel { + + private final FilterOperator filterOperator; + private final List<String> fieldNames; + private List<String> predicates; // for global predicate + Map<String, String> deviceToFilterMap = new LinkedHashMap<>(); // for device predicate + + protected IoTDBFilter(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) + throws LogicalOptimizeException { + super(cluster, traits, child, condition); + this.fieldNames = IoTDBRules.IoTDBFieldNames(getRowType()); + this.filterOperator = getIoTDBOperator(condition); + this.predicates = translateWhere(filterOperator); + + // add global predicate to each device if both global and device predicate exist + if (!this.predicates.isEmpty() && !this.deviceToFilterMap.isEmpty()) { + for (String device : deviceToFilterMap.keySet()) { + StringBuilder builder = new StringBuilder(this.deviceToFilterMap.get(device)); + builder.append(Util.toString(predicates, " OR ", " OR ", "")); + this.deviceToFilterMap.put(device, builder.toString()); + } + } + assert getConvention() == IoTDBRel.CONVENTION; + assert getConvention() == child.getConvention(); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(0.1); + } + + @Override + public Filter copy(RelTraitSet relTraitSet, RelNode input, RexNode condition) { + try { + return new IoTDBFilter(getCluster(), traitSet, input, condition); + } catch (LogicalOptimizeException e) { + throw new AssertionError(e.getMessage()); + } + } + + @Override + public void implement(Implementor implementor) { + implementor.visitChild(0, getInput()); + implementor.add(deviceToFilterMap, predicates); + } + + /** + * Translates {@link RexNode} expressions into IoTDB filter operator. + */ + private FilterOperator getIoTDBOperator(RexNode filter) { + switch (filter.getKind()) { + case EQUALS: + return getBasicOperator(SQLConstant.EQUAL, (RexCall) filter); + case NOT_EQUALS: + return getBasicOperator(SQLConstant.NOTEQUAL, (RexCall) filter); + case GREATER_THAN: + return getBasicOperator(SQLConstant.GREATERTHAN, (RexCall) filter); + case GREATER_THAN_OR_EQUAL: + return getBasicOperator(SQLConstant.GREATERTHANOREQUALTO, (RexCall) filter); + case LESS_THAN: + return getBasicOperator(SQLConstant.LESSTHAN, (RexCall) filter); + case LESS_THAN_OR_EQUAL: + return getBasicOperator(SQLConstant.LESSTHANOREQUALTO, (RexCall) filter); + case AND: + return getBinaryOperator(SQLConstant.KW_AND, ((RexCall) filter).getOperands()); + case OR: + return getBinaryOperator(SQLConstant.KW_OR, ((RexCall) filter).getOperands()); + default: + throw new AssertionError("cannot get IoTDBOperator from " + filter); + } + } + + private FilterOperator getBasicOperator(int tokenIntType, RexCall call) { + final RexNode left = call.operands.get(0); + final RexNode right = call.operands.get(1); + + FilterOperator operator = getBasicOperator2(tokenIntType, left, right); + if (operator != null) { + return operator; + } + operator = getBasicOperator2(tokenIntType, right, left); + if (operator != null) { + return operator; + } + throw new AssertionError("cannot translate basic operator: " + call); + } + + private FilterOperator getBasicOperator2(int tokenIntType, RexNode left, RexNode right) { + switch (right.getKind()) { + case LITERAL: + break; + default: + return null; + } + final RexLiteral rightLiteral = (RexLiteral) right; + switch (left.getKind()) { + case INPUT_REF: + String name = fieldNames.get(((RexInputRef) left).getIndex()); + return new BasicFunctionOperator(tokenIntType, new Path(name), literalValue(rightLiteral)); + case CAST: + return getBasicOperator2(tokenIntType, ((RexCall) left).getOperands().get(0), right); + default: + return null; + } + + } + + private FilterOperator getBinaryOperator(int tokenIntType, List<RexNode> operands) { + FilterOperator filterBinaryTree = new FilterOperator(tokenIntType); + FilterOperator currentNode = filterBinaryTree; + for (int i = 0; i < operands.size(); i++) { Review comment: better to add a local variance `size = operands.size()` to avoid frequently called in loop ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
