jt2594838 commented on code in PR #16531:
URL: https://github.com/apache/iotdb/pull/16531#discussion_r2541263280
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java:
##########
@@ -224,4 +233,156 @@ public void testNoPermission() throws Exception {
receiverEnv, "list user", "User,", Collections.singleton("root,"));
}
}
+
+ @Test
+ public void testSourcePermission() {
+ TestUtils.executeNonQuery(senderEnv, "create user `thulab`
'passwD@123456'", null);
+
+ // Shall fail if username is specified without password
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'user'='thulab')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ fail("When the 'user' or 'username' is specified, password must be
specified too.");
+ } catch (final SQLException ignore) {
+ // Expected
+ }
+
+ // Shall fail if password is wrong
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'user'='thulab'"
+ + "'password'='hack')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ fail("Shall fail if password is wrong.");
+ } catch (final SQLException ignore) {
+ // Expected
+ }
+
+ // Use current session, user is root
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'inclusion'='all')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail("Create pipe without user shall succeed if use the current
session");
+ }
+
+ // Alter to another user, shall fail because of lack of password
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify source ('username'='thulab')");
+ fail("Alter pipe shall fail if only user is specified");
+ } catch (final SQLException ignore) {
+ // Expected
Review Comment:
Better to examine that the message is proper.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigTreePrivilegeParseVisitor.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.source;
+
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.auth.entity.PrivilegeUnion;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.schema.template.Template;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanVisitor;
+import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorTreePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
+import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
+import org.apache.iotdb.confignode.service.ConfigNode;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_SCOPE;
+
+public class PipeConfigTreePrivilegeParseVisitor
+ extends ConfigPhysicalPlanVisitor<Optional<ConfigPhysicalPlan>, String> {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeConfigTreePrivilegeParseVisitor.class);
+ private final boolean skip;
+
+ PipeConfigTreePrivilegeParseVisitor(final boolean skip) {
+ this.skip = skip;
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitPlan(
+ final ConfigPhysicalPlan plan, final String context) {
+ return Optional.of(plan);
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitCreateDatabase(
+ final DatabaseSchemaPlan createDatabasePlan, final String userName) {
+ return canReadSysSchema(createDatabasePlan.getSchema().getName(),
userName, true)
+ ? Optional.of(createDatabasePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitAlterDatabase(
+ final DatabaseSchemaPlan alterDatabasePlan, final String userName) {
+ return canReadSysSchema(alterDatabasePlan.getSchema().getName(), userName,
true)
+ ? Optional.of(alterDatabasePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitDeleteDatabase(
+ final DeleteDatabasePlan deleteDatabasePlan, final String userName) {
+ return canReadSysSchema(deleteDatabasePlan.getName(), userName, true)
+ ? Optional.of(deleteDatabasePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitCreateSchemaTemplate(
+ final CreateSchemaTemplatePlan createSchemaTemplatePlan, final String
userName) {
+ return
canShowSchemaTemplate(createSchemaTemplatePlan.getTemplate().getName(),
userName)
+ ? Optional.of(createSchemaTemplatePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitCommitSetSchemaTemplate(
+ final CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan, final
String userName) {
+ return canReadSysSchema(commitSetSchemaTemplatePlan.getPath(), userName,
false)
+ ? Optional.of(commitSetSchemaTemplatePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitPipeUnsetSchemaTemplate(
+ final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlan, final
String userName) {
+ return canReadSysSchema(pipeUnsetSchemaTemplatePlan.getPath(), userName,
false)
+ ? Optional.of(pipeUnsetSchemaTemplatePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitExtendSchemaTemplate(
+ final ExtendSchemaTemplatePlan extendSchemaTemplatePlan, final String
userName) {
+ return canShowSchemaTemplate(
+
extendSchemaTemplatePlan.getTemplateExtendInfo().getTemplateName(), userName)
+ ? Optional.of(extendSchemaTemplatePlan)
+ : Optional.empty();
+ }
+
+ public boolean canShowSchemaTemplate(final String templateName, final String
userName) {
+ try {
+ return ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(userName, new
PrivilegeUnion(PrivilegeType.SYSTEM))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || ConfigNode.getInstance()
+ .getConfigManager()
+ .getClusterSchemaManager()
+ .getPathsSetTemplate(templateName, ALL_MATCH_SCOPE)
+ .getPathList()
+ .stream()
+ .anyMatch(
+ path -> {
+ try {
+ return ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(
+ userName,
+ new PrivilegeUnion(
+ Collections.singletonList(
+ new PartialPath(path)
+
.concatNode(MULTI_LEVEL_PATH_WILDCARD)),
+ PrivilegeType.READ_SCHEMA))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ } catch (final IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Un-parse-able path name encountered during template privilege
trimming, please check",
+ e);
+ return false;
+ }
+ }
+
+ public boolean canReadSysSchema(
+ final String path, final String userName, final boolean canSkipMulti) {
+ try {
+ return canSkipMulti
+ && ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(
+ userName,
+ new PrivilegeUnion(
+ Collections.singletonList(new PartialPath(path)),
+ PrivilegeType.READ_SCHEMA))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(
+ userName,
+ new PrivilegeUnion(
+ Collections.singletonList(
+ new
PartialPath(path).concatNode(MULTI_LEVEL_PATH_WILDCARD)),
+ PrivilegeType.READ_SCHEMA))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(userName, new
PrivilegeUnion(PrivilegeType.SYSTEM))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ } catch (final IllegalPathException e) {
+ LOGGER.warn("Un-parse-able path name encountered during privilege
trimming, please check", e);
+ return false;
+ }
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitGrantUser(
+ final AuthorTreePlan grantUserPlan, final String userName) {
+ return visitUserPlan(grantUserPlan, userName);
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitRevokeUser(
+ final AuthorTreePlan revokeUserPlan, final String userName) {
+ return visitUserPlan(revokeUserPlan, userName);
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitGrantRole(
+ final AuthorTreePlan revokeUserPlan, final String userName) {
+ return visitRolePlan(revokeUserPlan, userName);
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitRevokeRole(
+ final AuthorTreePlan revokeUserPlan, final String userName) {
+ return visitRolePlan(revokeUserPlan, userName);
+ }
Review Comment:
Mind the argument names.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java:
##########
@@ -682,6 +682,33 @@ public TAuthizedPatternTreeResp
generateAuthorizedPTree(String username, int per
return resp;
}
+ public PathPatternTree generateRawAuthorizedPTree(final String username,
final PrivilegeType type)
+ throws AuthException {
+ final User user = authorizer.getUser(username);
+ final PathPatternTree pPtree = new PathPatternTree();
+ if (user == null) {
+ return null;
+ }
Review Comment:
Better to judge before object allocation.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java:
##########
@@ -92,6 +99,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
protected volatile ProgressIndex overridingProgressIndex;
private Set<String> tableNames;
+ private Map<IDeviceID, String[]> treeSchemaMap;
Review Comment:
Add a comment to explain what the value is.
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java:
##########
@@ -65,6 +69,17 @@ private String getDefaultPattern() {
//////////////////////////// Tree Pattern Operations
////////////////////////////
+ public static <T> List<T> applyReversedIndexesOnList(
+ final List<Integer> filteredIndexes, final List<T> originalList) {
+ final Set<Integer> indexes = new HashSet<>(filteredIndexes);
+ return Objects.nonNull(originalList)
+ ? IntStream.range(0, originalList.size())
+ .filter(index -> !indexes.contains(index)) // 保留不在排除列表中的下标
+ .mapToObj(originalList::get)
+ .collect(Collectors.toList())
+ : null;
+ }
Review Comment:
<img width="668" height="134" alt="image"
src="https://github.com/user-attachments/assets/682f7657-25ee-4224-b55d-bf28c3ad8832"
/>
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/auth/AuthorPlanExecutor.java:
##########
@@ -682,6 +682,33 @@ public TAuthizedPatternTreeResp
generateAuthorizedPTree(String username, int per
return resp;
}
+ public PathPatternTree generateRawAuthorizedPTree(final String username,
final PrivilegeType type)
+ throws AuthException {
+ final User user = authorizer.getUser(username);
+ final PathPatternTree pPtree = new PathPatternTree();
+ if (user == null) {
+ return null;
+ }
+
+ constructAuthorityScope(pPtree, user, type);
+
+ for (final String roleName : user.getRoleSet()) {
+ Role role = authorizer.getRole(roleName);
+ if (role != null) {
+ constructAuthorityScope(pPtree, role, type);
+ }
+ }
+ pPtree.constructTree();
+ final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ final DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ try {
+ pPtree.serialize(dataOutputStream);
+ } catch (final IOException e) {
+ return null;
+ }
Review Comment:
What are these for?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java:
##########
@@ -223,15 +246,20 @@ protected Optional<PipeWritePlanEvent>
trimRealtimeEventByPrivilege(
final ConfigPhysicalPlan plan =
((PipeConfigRegionWritePlanEvent) event).getConfigPhysicalPlan();
final Boolean isTableDatabasePlan = isTableDatabasePlan(plan);
- if (Boolean.FALSE.equals(isTableDatabasePlan)) {
- return Optional.of(event);
+ if (!Boolean.TRUE.equals(isTableDatabasePlan)) {
+ final Optional<ConfigPhysicalPlan> result =
treePrivilegeParseVisitor.process(plan, userName);
+ if (result.isPresent()) {
+ return Optional.of(
+ new PipeConfigRegionWritePlanEvent(result.get(),
event.isGeneratedByPipe()));
+ }
}
-
- final Optional<ConfigPhysicalPlan> result =
- TABLE_PRIVILEGE_PARSE_VISITOR.process(plan, userName);
- if (result.isPresent()) {
- return Optional.of(
- new PipeConfigRegionWritePlanEvent(result.get(),
event.isGeneratedByPipe()));
+ if (!Boolean.FALSE.equals(isTableDatabasePlan)) {
Review Comment:
Reverse the conditions to remove unnecessary `!`.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/schemaregion/IoTDBSchemaRegionSource.java:
##########
@@ -203,8 +206,9 @@ protected PipeWritePlanEvent
getNextEventInCurrentSnapshot() {
protected Optional<PipeWritePlanEvent> trimRealtimeEventByPrivilege(
final PipeWritePlanEvent event) throws AccessDeniedException {
final Optional<PlanNode> result =
- TABLE_PRIVILEGE_PARSE_VISITOR.process(
- ((PipeSchemaRegionWritePlanEvent) event).getPlanNode(),
userEntity);
+ treePrivilegeParseVisitor
+ .process(((PipeSchemaRegionWritePlanEvent) event).getPlanNode(),
userEntity)
+ .flatMap(planNode ->
TABLE_PRIVILEGE_PARSE_VISITOR.process(planNode, userEntity));
Review Comment:
The same node is processed by both treeVisitor and tableVisitor?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java:
##########
@@ -441,33 +449,86 @@ public boolean isGeneratedByPipe() {
@Override
public void throwIfNoPrivilege() {
try {
- if (!isTableModelEvent() ||
AuthorityChecker.SUPER_USER.equals(userName)) {
+ if (AuthorityChecker.SUPER_USER.equals(userName)) {
return;
}
if (!waitForTsFileClose()) {
LOGGER.info("Temporary tsFile {} detected, will skip its transfer.",
tsFile);
return;
}
- for (final String table : tableNames) {
- if (!tablePattern.matchesDatabase(getTableModelDatabaseName())
- || !tablePattern.matchesTable(table)) {
- continue;
+ if (isTableModelEvent()) {
+ for (final String table : tableNames) {
+ if (!tablePattern.matchesDatabase(getTableModelDatabaseName())
+ || !tablePattern.matchesTable(table)) {
+ continue;
+ }
+ if (!AuthorityChecker.getAccessControl()
+ .checkCanSelectFromTable4Pipe(
+ userName,
+ new QualifiedObjectName(getTableModelDatabaseName(), table),
+ new UserEntity(Long.parseLong(userId), userName,
cliHostname))) {
+ if (skipIfNoPrivileges) {
+ shouldParse4Privilege = true;
+ } else {
+ throw new AccessDeniedException(
+ String.format(
+ "No privilege for SELECT for user %s at table %s.%s",
+ userName, tableModelDatabaseName, table));
+ }
+ }
}
- if (!AuthorityChecker.getAccessControl()
- .checkCanSelectFromTable4Pipe(
- userName,
- new QualifiedObjectName(getTableModelDatabaseName(), table),
- new UserEntity(Long.parseLong(userId), userName,
cliHostname))) {
+ }
+ // Real-time tsFiles
+ else if (Objects.nonNull(treeSchemaMap)) {
+ final List<MeasurementPath> measurementList = new ArrayList<>();
+ for (final Map.Entry<IDeviceID, String[]> entry :
treeSchemaMap.entrySet()) {
+ final IDeviceID deviceID = entry.getKey();
+ for (final String measurement : entry.getValue()) {
+ if (treePattern.matchesMeasurement(deviceID, measurement)) {
+ measurementList.add(new MeasurementPath(deviceID, measurement));
+ }
+ }
+ }
+ final TSStatus status =
+ AuthorityChecker.getAccessControl()
+ .checkSeriesPrivilege4Pipe(
+ new UserEntity(Long.parseLong(userId), userName,
cliHostname),
+ measurementList,
+ PrivilegeType.READ_DATA);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
if (skipIfNoPrivileges) {
shouldParse4Privilege = true;
} else {
- throw new AccessDeniedException(
- String.format(
- "No privilege for SELECT for user %s at table %s.%s",
- userName, tableModelDatabaseName, table));
+ throw new AccessDeniedException(status.getMessage());
}
}
}
+ // Historical tsFiles
+ // Coarse filter, will be judged in inner class
+ else {
+ final Set<IDeviceID> devices = getDeviceSet();
+ if (Objects.nonNull(devices)) {
+ final List<MeasurementPath> measurementList = new ArrayList<>();
+ for (final IDeviceID device : devices) {
+ measurementList.add(new MeasurementPath(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
+ }
+ final TSStatus status =
+ AuthorityChecker.getAccessControl()
+ .checkSeriesPrivilege4Pipe(
+ new UserEntity(Long.parseLong(userId), userName,
cliHostname),
+ measurementList,
+ PrivilegeType.READ_DATA);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode())
{
+ if (skipIfNoPrivileges) {
+ shouldParse4Privilege = true;
+ } else {
+ throw new AccessDeniedException(status.getMessage());
+ }
+ }
+ } else {
+ shouldParse4Privilege = true;
+ }
+ }
Review Comment:
If the file only contains root.db1.d1.s1 (on which the user has privileges),
but the user does not have privileges on root.db1.s2, and skipIfNoPrivileges =
false, then the file will not be transferred?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigTreePrivilegeParseVisitor.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.source;
+
+import org.apache.iotdb.commons.auth.AuthException;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.auth.entity.PrivilegeUnion;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.schema.template.Template;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanVisitor;
+import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorTreePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
+import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
+import
org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan;
+import org.apache.iotdb.confignode.service.ConfigNode;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_SCOPE;
+
+public class PipeConfigTreePrivilegeParseVisitor
+ extends ConfigPhysicalPlanVisitor<Optional<ConfigPhysicalPlan>, String> {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PipeConfigTreePrivilegeParseVisitor.class);
+ private final boolean skip;
+
+ PipeConfigTreePrivilegeParseVisitor(final boolean skip) {
+ this.skip = skip;
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitPlan(
+ final ConfigPhysicalPlan plan, final String context) {
+ return Optional.of(plan);
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitCreateDatabase(
+ final DatabaseSchemaPlan createDatabasePlan, final String userName) {
+ return canReadSysSchema(createDatabasePlan.getSchema().getName(),
userName, true)
+ ? Optional.of(createDatabasePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitAlterDatabase(
+ final DatabaseSchemaPlan alterDatabasePlan, final String userName) {
+ return canReadSysSchema(alterDatabasePlan.getSchema().getName(), userName,
true)
+ ? Optional.of(alterDatabasePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitDeleteDatabase(
+ final DeleteDatabasePlan deleteDatabasePlan, final String userName) {
+ return canReadSysSchema(deleteDatabasePlan.getName(), userName, true)
+ ? Optional.of(deleteDatabasePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitCreateSchemaTemplate(
+ final CreateSchemaTemplatePlan createSchemaTemplatePlan, final String
userName) {
+ return
canShowSchemaTemplate(createSchemaTemplatePlan.getTemplate().getName(),
userName)
+ ? Optional.of(createSchemaTemplatePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitCommitSetSchemaTemplate(
+ final CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan, final
String userName) {
+ return canReadSysSchema(commitSetSchemaTemplatePlan.getPath(), userName,
false)
+ ? Optional.of(commitSetSchemaTemplatePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitPipeUnsetSchemaTemplate(
+ final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlan, final
String userName) {
+ return canReadSysSchema(pipeUnsetSchemaTemplatePlan.getPath(), userName,
false)
+ ? Optional.of(pipeUnsetSchemaTemplatePlan)
+ : Optional.empty();
+ }
+
+ @Override
+ public Optional<ConfigPhysicalPlan> visitExtendSchemaTemplate(
+ final ExtendSchemaTemplatePlan extendSchemaTemplatePlan, final String
userName) {
+ return canShowSchemaTemplate(
+
extendSchemaTemplatePlan.getTemplateExtendInfo().getTemplateName(), userName)
+ ? Optional.of(extendSchemaTemplatePlan)
+ : Optional.empty();
+ }
+
+ public boolean canShowSchemaTemplate(final String templateName, final String
userName) {
+ try {
+ return ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(userName, new
PrivilegeUnion(PrivilegeType.SYSTEM))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || ConfigNode.getInstance()
+ .getConfigManager()
+ .getClusterSchemaManager()
+ .getPathsSetTemplate(templateName, ALL_MATCH_SCOPE)
+ .getPathList()
+ .stream()
+ .anyMatch(
+ path -> {
+ try {
+ return ConfigNode.getInstance()
+ .getConfigManager()
+ .getPermissionManager()
+ .checkUserPrivileges(
+ userName,
+ new PrivilegeUnion(
+ Collections.singletonList(
+ new PartialPath(path)
+
.concatNode(MULTI_LEVEL_PATH_WILDCARD)),
+ PrivilegeType.READ_SCHEMA))
+ .getStatus()
+ .getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ } catch (final IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Un-parse-able path name encountered during template privilege
trimming, please check",
+ e);
+ return false;
+ }
+ }
Review Comment:
Add a test to cover this?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java:
##########
@@ -4048,8 +4046,9 @@ public Statement visitCreatePipe(final
IoTDBSqlParser.CreatePipeContext ctx) {
final CreatePipeStatement createPipeStatement =
new CreatePipeStatement(StatementType.CREATE_PIPE);
+ final String pipeName = parseIdentifier(ctx.pipeName.getText());
if (ctx.pipeName != null) {
- createPipeStatement.setPipeName(parseIdentifier(ctx.pipeName.getText()));
+ createPipeStatement.setPipeName(pipeName);
Review Comment:
Potential NPE?
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java:
##########
@@ -224,4 +233,156 @@ public void testNoPermission() throws Exception {
receiverEnv, "list user", "User,", Collections.singleton("root,"));
}
}
+
+ @Test
+ public void testSourcePermission() {
+ TestUtils.executeNonQuery(senderEnv, "create user `thulab`
'passwD@123456'", null);
+
+ // Shall fail if username is specified without password
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'user'='thulab')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ fail("When the 'user' or 'username' is specified, password must be
specified too.");
+ } catch (final SQLException ignore) {
+ // Expected
+ }
+
+ // Shall fail if password is wrong
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'user'='thulab'"
+ + "'password'='hack')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ fail("Shall fail if password is wrong.");
+ } catch (final SQLException ignore) {
+ // Expected
+ }
+
+ // Use current session, user is root
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2b"
+ + " with source ("
+ + "'inclusion'='all')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail("Create pipe without user shall succeed if use the current
session");
+ }
+
+ // Alter to another user, shall fail because of lack of password
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify source ('username'='thulab')");
+ fail("Alter pipe shall fail if only user is specified");
+ } catch (final SQLException ignore) {
+ // Expected
+ }
+
+ // Successfully alter
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ "alter pipe a2b modify source ('username'='thulab',
'password'='passwD@123456')");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail("Alter pipe shall not fail if user and password are specified");
+ }
+
+ TestUtils.executeNonQuery(senderEnv, "create database root.test");
+
+ // Shall not be transferred
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv, "count databases", "count,", Collections.singleton("0,"));
+
+ // GRANT privileges ON prefixPath (COMMA prefixPath)* TO USER
userName=usernameWithRoot
+ // (grantOpt)?
+ // Grant some privilege
+ TestUtils.executeNonQuery(senderEnv, "grant SYSTEM on root.** to user
thulab");
+
+ TestUtils.executeNonQuery(senderEnv, "create database root.test1");
+
+ // Shall be transferred
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "count databases root.tes*", "count,",
Collections.singleton("1,"));
+
+ // Alter pipe, throw exception if no privileges
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify source ('skipif'='')");
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Write some data
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "create timeSeries root.vehicle.car.temperature DOUBLE",
+ "insert into root.vehicle.car(temperature) values (36.5)"));
+
+ // Exception, block here
+ TableModelUtils.assertCountDataAlwaysOnEnv("test", "test", 0, receiverEnv);
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv, "count timeSeries", "count(timeseries),",
Collections.singleton("0,"));
+
+ // Grant SELECT privilege
+ TestUtils.executeNonQueries(
+ senderEnv, Arrays.asList("grant READ on root.** to user thulab",
"start pipe a2b"));
+
+ // Will finally pass
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select count(*) from root.vehicle.**",
+ "count(root.vehicle.car.temperature),",
+ Collections.singleton("1,"));
+
+ // test showing pipe
+ // Create another pipe, user is root
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe a2c"
+ + " with source ("
+ + "'inclusion'='all',"
+ + "'capture.tree'='true',"
+ + "'capture.table'='true')"
+ + " with sink ("
+ + "'node-urls'='%s')",
+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
+ } catch (final SQLException e) {
+ e.printStackTrace();
+ fail("Create pipe without user shall succeed if use the current
session");
+ }
+
+ TestUtils.executeNonQuery(senderEnv, "revoke SYSTEM on root.** from user
thulab");
+
+ // A user shall only see its own pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ Assert.assertEquals(
+ 1, client.showPipe(new
TShowPipeReq().setUserName("thulab")).pipeInfoList.size());
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
Review Comment:
You can show another user's pipe simply by setting the request's username?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java:
##########
@@ -302,6 +309,29 @@ private void checkTableName(final String tableName) {
}
}
+ private void checkTreePattern(final IDeviceID deviceID, final String[]
measurements)
+ throws IllegalPathException {
+ final List<MeasurementPath> measurementList = new ArrayList<>();
+ for (final String measurement : measurements) {
+ if (!treePattern.matchesMeasurement(deviceID, measurement)) {
+ measurementList.add(new MeasurementPath(deviceID, measurement));
+ }
+ }
Review Comment:
Why is the path added when not match?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java:
##########
@@ -195,6 +186,10 @@ public void customize(
parameters.getBooleanOrDefault(
Arrays.asList(CONNECTOR_USE_EVENT_USER_NAME_KEY,
SINK_USE_EVENT_USER_NAME_KEY),
CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE);
+
+ if (SESSION_MANAGER.getCurrSession() != null) {
+ SESSION_MANAGER.registerSession(session);
+ }
Review Comment:
Why only register the session when there is already one?
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/IoTDBTreePattern.java:
##########
@@ -65,6 +69,17 @@ private String getDefaultPattern() {
//////////////////////////// Tree Pattern Operations
////////////////////////////
+ public static <T> List<T> applyReversedIndexesOnList(
+ final List<Integer> filteredIndexes, final List<T> originalList) {
+ final Set<Integer> indexes = new HashSet<>(filteredIndexes);
+ return Objects.nonNull(originalList)
+ ? IntStream.range(0, originalList.size())
+ .filter(index -> !indexes.contains(index)) // 保留不在排除列表中的下标
+ .mapToObj(originalList::get)
+ .collect(Collectors.toList())
+ : null;
+ }
Review Comment:
May avoid using expensive data structures like sets.
```
public static <T> List<T> applyReversedIndexesOnListV2(
final List<Integer> filteredIndexes, final List<T> originalList) {
// filteredIndexes.sort(null); if necessary
List<T> filteredList = new ArrayList<>(originalList.size() -
filteredIndexes.size());
int filteredIndexPos = 0;
int processingIndex = 0;
for (; processingIndex < originalList.size(); processingIndex++) {
if (filteredIndexPos >= filteredIndexes.size()) {
// all filteredIndexes processed, add remaining to the filteredList
if (processingIndex < filteredIndexes.size()) {
filteredList.addAll(originalList.subList(processingIndex,
originalList.size()));
}
break;
} else {
int filteredIndex = filteredIndexes.get(filteredIndexPos);
if (filteredIndex == processingIndex) {
// the index is filtered, move to the next filtered pos
filteredIndexPos ++;
} else {
// the index is not filtered, add to the filteredList
filteredList.add(originalList.get(processingIndex));
}
}
}
return filteredList;
}
public static <T> List<T> applyReversedIndexesOnListV1(
final List<Integer> filteredIndexes, final List<T> originalList) {
final Set<Integer> indexes = new HashSet<>(filteredIndexes);
return Objects.nonNull(originalList)
? IntStream.range(0, originalList.size())
.filter(index -> !indexes.contains(index)) // 保留不在排除列表中的下标
.mapToObj(originalList::get)
.collect(Collectors.toList())
: null;
}
public static void main(String[] args) {
int elementNum = 10_000_000;
int filteredNum = elementNum / 10;
Random random = new Random();
List<Integer> originalList = IntStream.range(0,
elementNum).boxed().collect(Collectors.toList());
List<Integer> filteredIndexes = new ArrayList<>(filteredNum);
for (int i = 0; i < filteredNum; i++) {
filteredIndexes.add(random.nextInt(elementNum));
}
filteredIndexes =
filteredIndexes.stream().sorted().distinct().collect(Collectors.toList());
long start = System.currentTimeMillis();
List<Integer> appliedList =
applyReversedIndexesOnListV1(filteredIndexes, originalList);
System.out.println(System.currentTimeMillis() - start);
Set<Integer> appliedSet = new HashSet<>(appliedList);
for (Integer filteredIndex : filteredIndexes) {
if (appliedSet.contains(filteredIndex)) {
System.out.println("Incorrect implementation");
System.exit(-1);
}
}
start = System.currentTimeMillis();
appliedList = applyReversedIndexesOnListV2(filteredIndexes,
originalList);
System.out.println(System.currentTimeMillis() - start);
appliedSet = new HashSet<>(appliedList);
for (Integer filteredIndex : filteredIndexes) {
if (appliedSet.contains(filteredIndex)) {
System.out.println("Incorrect implementation");
System.exit(-1);
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]