Copilot commented on code in PR #17009: URL: https://github.com/apache/iotdb/pull/17009#discussion_r2688620076
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java: ########## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.service.exernalservice; + +import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry; +import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.commons.externalservice.ServiceInfo; +import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.externalservice.api.IExternalService; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkState; +import static org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING; +import static org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED; + +public class ExternalServiceManagementService { + @GuardedBy("lock") + private final Map<String, ServiceInfo> serviceInfos; + + private final String libRoot; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private static final Logger LOGGER = + LoggerFactory.getLogger(ExternalServiceManagementService.class); + + private ExternalServiceManagementService(String libRoot) { + this.serviceInfos = new HashMap<>(); + restoreBuiltInServices(); + this.libRoot = libRoot; + } + + public Iterator<TExternalServiceEntry> showService(int dataNodeId) + throws ClientManagerException, TException { + try { + lock.readLock().lock(); + + ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID); + TExternalServiceListResp resp = client.showExternalService(dataNodeId); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBRuntimeException(resp.getStatus().message, resp.getStatus().code); + } + return resp.getExternalServiceInfos().iterator(); + } finally { + lock.readLock().unlock(); + } + } + + public void createService(String serviceName, String className) + throws ClientManagerException, TException { + try { + lock.writeLock().lock(); + + // 1. validate + if (serviceInfos.containsKey(serviceName)) { + TSStatus status = new TSStatus(TSStatusCode.EXTERNAL_SERVICE_ALREADY_EXIST.getStatusCode()); + status.setMessage( + String.format("Failed to create External Service %s, it already exists!", serviceName)); + throw new ExternalServiceManagementException(status); + } + + // 2. persist on CN + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TSStatus status = + client.createExternalService( + new TCreateExternalServiceReq(QueryId.getDataNodeId(), serviceName, className)); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBRuntimeException(status.message, status.code); + } + } + + // 3. modify memory info + serviceInfos.put( + serviceName, + new ServiceInfo(serviceName, className, ServiceInfo.ServiceType.USER_DEFINED)); + } finally { + lock.writeLock().unlock(); + } + } + + public void startService(String serviceName) throws ClientManagerException, TException { + try { + lock.writeLock().lock(); + + // 1. validate + ServiceInfo serviceInfo = serviceInfos.get(serviceName); + if (serviceInfo == null) { + TSStatus status = new TSStatus(TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode()); + status.setMessage( + String.format( + "Failed to start External Service %s, because it is not existed!", serviceName)); + throw new ExternalServiceManagementException(status); + } + + // 2. call start method of ServiceInstance, create if Instance was not created + if (serviceInfo.getState() == RUNNING) { + return; + } else { + // The state is STOPPED + if (serviceInfo.getServiceInstance() == null) { + // lazy create Instance + serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName)); + } + serviceInfo.getServiceInstance().start(); + } + + // 3. persist on CN, rollback if failed + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TSStatus status = client.startExternalService(QueryId.getDataNodeId(), serviceName); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + serviceInfo.getServiceInstance().stop(); + throw new IoTDBRuntimeException(status.message, status.code); + } + } + + // 4. modify memory info + serviceInfo.setState(RUNNING); + } finally { + lock.writeLock().unlock(); + } + } + + private IExternalService createExternalServiceInstance(String serviceName) { + // close ClassLoader automatically to release the file handle + try (ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); ) { + return (IExternalService) + Class.forName(serviceName, true, classLoader).getDeclaredConstructor().newInstance(); Review Comment: The `createExternalServiceInstance` method uses `serviceName` as the class name when calling `Class.forName`, but it should use the `className` from `ServiceInfo`. Change parameter or retrieve className from serviceInfo. ########## iotdb-protocol/thrift-commons/src/main/thrift/common.thrift: ########## @@ -261,6 +261,20 @@ struct TNodeLocations { 2: optional list<TDataNodeLocation> dataNodeLocations } +struct TExternalServiceListResp { + 1: required TSStatus status + 2: required list<TExternalServiceEntry> externalServiceInfos +} + + +struct TExternalServiceEntry { + 1: required string serviceName + 2: required string className + 3: required byte state + 4: required i32 dataNodId Review Comment: Corrected spelling of 'dataNodId' to 'dataNodeId'. ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.externalservice; + +import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.externalservice.ServiceInfo; +import org.apache.iotdb.commons.snapshot.SnapshotProcessor; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.confignode.consensus.request.write.externalservice.CreateExternalServicePlan; +import org.apache.iotdb.confignode.consensus.request.write.externalservice.DropExternalServicePlan; +import org.apache.iotdb.confignode.consensus.request.write.externalservice.StartExternalServicePlan; +import org.apache.iotdb.confignode.consensus.request.write.externalservice.StopExternalServicePlan; +import org.apache.iotdb.confignode.consensus.response.externalservice.ShowExternalServiceResp; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.zip.CRC32; + +public class ExternalServiceInfo implements SnapshotProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(ExternalServiceInfo.class); + + private final Map<Integer, Map<String, ServiceInfo>> datanodeToServiceInfos; + + private static final String SNAPSHOT_FILENAME = "service_info.bin"; + private static final byte SERIALIZATION_VERSION = 1; + private final CRC32 crc32 = new CRC32(); + + private static final String SERVICE_NOT_EXISTED = + "ExternalService %s is not existed on DataNode %s."; + + public ExternalServiceInfo() { + datanodeToServiceInfos = new ConcurrentHashMap<>(); + } + + /** + * Add a new ExternalService on target DataNode. + * + * @return SUCCESS_STATUS if <tt>this service</tt> was not existed on target DataNode, otherwise + * EXTERNAL_SERVICE_AlREADY_EXIST + */ + public TSStatus addService(CreateExternalServicePlan plan) { + TSStatus res = new TSStatus(); + Map<String, ServiceInfo> serviceInfos = + datanodeToServiceInfos.computeIfAbsent( + plan.getDatanodeId(), k -> new ConcurrentHashMap<>()); + String serviceName = plan.getServiceInfo().getServiceName(); + if (serviceInfos.containsKey(serviceName)) { + res.code = TSStatusCode.EXTERNAL_SERVICE_ALREADY_EXIST.getStatusCode(); + res.message = String.format(SERVICE_NOT_EXISTED, serviceName, plan.getDatanodeId()); Review Comment: The error message for adding an existing service incorrectly uses `SERVICE_NOT_EXISTED`. It should say 'ExternalService %s already exists on DataNode %s.' instead. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/externalservice/ShowExternalServiceTask.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.externalservice; + +import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry; +import org.apache.iotdb.commons.externalservice.ServiceInfo; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.utils.Binary; + +public class ShowExternalServiceTask implements IConfigTask { + private final int dataNodeId; + + public ShowExternalServiceTask(int dataNodeId) { + this.dataNodeId = dataNodeId; + } + + @Override + public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.showExternalService(dataNodeId); + } + + public static void appendServiceEntry( + TExternalServiceEntry externalServiceEntry, ColumnBuilder[] columnBuilders) { + columnBuilders[0].writeBinary( + new Binary(externalServiceEntry.getServiceName(), TSFileConfig.STRING_CHARSET)); + columnBuilders[1].writeInt(externalServiceEntry.getDataNodId()); Review Comment: Corrected method name from 'getDataNodId' to 'getDataNodeId'. ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/externalservice/ShowExternalServiceResp.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.response.externalservice; + +import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry; +import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.consensus.common.DataSet; +import org.apache.iotdb.rpc.TSStatusCode; + +import javax.validation.constraints.NotNull; + +import java.util.Comparator; +import java.util.List; + +public class ShowExternalServiceResp implements DataSet { + + private final List<TExternalServiceEntry> serviceInfoEntryList; + + public ShowExternalServiceResp(@NotNull List<TExternalServiceEntry> serviceInfoEntryList) { + this.serviceInfoEntryList = serviceInfoEntryList; + } + + public List<TExternalServiceEntry> getServiceInfoEntryList() { + return serviceInfoEntryList; + } + + public TExternalServiceListResp convertToRpcShowExternalServiceResp() { + serviceInfoEntryList.sort( + Comparator.comparingInt(TExternalServiceEntry::getDataNodId) Review Comment: Corrected method name from 'getDataNodId' to 'getDataNodeId'. ```suggestion Comparator.comparingInt(TExternalServiceEntry::getDataNodeId) ``` ########## integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBServicesIT.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent.informationschema; + +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.apache.iotdb.commons.schema.table.InformationSchema.INFORMATION_DATABASE; +import static org.apache.iotdb.db.it.utils.TestUtils.createUser; +import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest; +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; +import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT; +import static org.apache.iotdb.itbase.env.BaseEnv.TREE_SQL_DIALECT; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBServicesIT { + private static final String ADMIN_NAME = + CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + private static final String ADMIN_PWD = + CommonDescriptor.getInstance().getConfig().getAdminPassword(); + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + createUser("test", "TimechoDB@2021"); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testQueryResult() { + String[] retArray = + new String[] { + "MQTT,1,STOPPED,org.apache.iotdb.externalservice.Mqtt,built-in,", + "REST,1,STOPPED,org.apache.iotdb.externalservice.Rest,built-in,", + }; + + // TableModel + String[] header = + new String[] {"service_name", "datanode_id", "state", "class_name", "service_type"}; + + String sql = "SELECT * FROM services where datanode_id = 1"; + tableResultSetEqualTest(sql, header, retArray, INFORMATION_DATABASE); + sql = "show services on 1"; + tableResultSetEqualTest(sql, header, retArray, INFORMATION_DATABASE); + + // TreeModel + header = new String[] {"ServiceName", "DataNodeId", "State", "ClassName", "ServiceType"}; + + resultSetEqualTest(sql, header, retArray); + } + + @Test + public void testPrivilege() { + testTargetModelPrivilege(TABLE_SQL_DIALECT); + testTargetModelPrivilege(TREE_SQL_DIALECT); + } + + private void testTargetModelPrivilege(String model) { + String sql = "show services"; + try (Connection connection = + EnvFactory.getEnv().getConnection("test", "TimechoDB@2021", model); + Statement statement = connection.createStatement()) { + statement.executeQuery(sql); + } catch (SQLException e) { + Assert.assertTrue( + e.getMessage() + .contains("No permissions for this operation, please add privilege SYSTEM")); + } + + try (Connection connection2 = EnvFactory.getEnv().getConnection(ADMIN_NAME, ADMIN_PWD, model)) { + ResultSet resultSet = connection2.createStatement().executeQuery(sql); + resultSet.close(); Review Comment: This Statement is not always closed on method exit. ```suggestion try (Connection connection2 = EnvFactory.getEnv().getConnection(ADMIN_NAME, ADMIN_PWD, model); Statement statement = connection2.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { // Intentionally no-op: executing the query is sufficient to validate privilege ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java: ########## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.service.exernalservice; + +import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry; +import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.commons.externalservice.ServiceInfo; +import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.externalservice.api.IExternalService; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkState; +import static org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING; +import static org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED; + +public class ExternalServiceManagementService { + @GuardedBy("lock") + private final Map<String, ServiceInfo> serviceInfos; + + private final String libRoot; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private static final Logger LOGGER = + LoggerFactory.getLogger(ExternalServiceManagementService.class); + + private ExternalServiceManagementService(String libRoot) { + this.serviceInfos = new HashMap<>(); + restoreBuiltInServices(); + this.libRoot = libRoot; + } + + public Iterator<TExternalServiceEntry> showService(int dataNodeId) + throws ClientManagerException, TException { + try { + lock.readLock().lock(); + + ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID); + TExternalServiceListResp resp = client.showExternalService(dataNodeId); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBRuntimeException(resp.getStatus().message, resp.getStatus().code); + } + return resp.getExternalServiceInfos().iterator(); + } finally { + lock.readLock().unlock(); + } + } + + public void createService(String serviceName, String className) + throws ClientManagerException, TException { + try { + lock.writeLock().lock(); + + // 1. validate + if (serviceInfos.containsKey(serviceName)) { + TSStatus status = new TSStatus(TSStatusCode.EXTERNAL_SERVICE_ALREADY_EXIST.getStatusCode()); + status.setMessage( + String.format("Failed to create External Service %s, it already exists!", serviceName)); + throw new ExternalServiceManagementException(status); + } + + // 2. persist on CN + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TSStatus status = + client.createExternalService( + new TCreateExternalServiceReq(QueryId.getDataNodeId(), serviceName, className)); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBRuntimeException(status.message, status.code); + } + } + + // 3. modify memory info + serviceInfos.put( + serviceName, + new ServiceInfo(serviceName, className, ServiceInfo.ServiceType.USER_DEFINED)); + } finally { + lock.writeLock().unlock(); + } + } + + public void startService(String serviceName) throws ClientManagerException, TException { + try { + lock.writeLock().lock(); + + // 1. validate + ServiceInfo serviceInfo = serviceInfos.get(serviceName); + if (serviceInfo == null) { + TSStatus status = new TSStatus(TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode()); + status.setMessage( + String.format( + "Failed to start External Service %s, because it is not existed!", serviceName)); + throw new ExternalServiceManagementException(status); + } + + // 2. call start method of ServiceInstance, create if Instance was not created + if (serviceInfo.getState() == RUNNING) { + return; + } else { + // The state is STOPPED + if (serviceInfo.getServiceInstance() == null) { + // lazy create Instance + serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName)); + } + serviceInfo.getServiceInstance().start(); + } + + // 3. persist on CN, rollback if failed + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TSStatus status = client.startExternalService(QueryId.getDataNodeId(), serviceName); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + serviceInfo.getServiceInstance().stop(); + throw new IoTDBRuntimeException(status.message, status.code); + } + } + + // 4. modify memory info + serviceInfo.setState(RUNNING); + } finally { + lock.writeLock().unlock(); + } + } + + private IExternalService createExternalServiceInstance(String serviceName) { + // close ClassLoader automatically to release the file handle + try (ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); ) { + return (IExternalService) + Class.forName(serviceName, true, classLoader).getDeclaredConstructor().newInstance(); + } catch (IOException + | InstantiationException + | InvocationTargetException + | NoSuchMethodException + | IllegalAccessException + | ClassNotFoundException + | ClassCastException e) { + TSStatus status = + new TSStatus(TSStatusCode.EXTERNAL_SERVICE_INSTANCE_CREATE_ERROR.getStatusCode()); + status.setMessage( + String.format( + "Failed to start External Service %s, because its instance can not be constructed successfully. Exception: %s", + serviceName, e)); + LOGGER.warn(status.getMessage(), e); + throw new ExternalServiceManagementException(status); + } + } + + public void stopService(String serviceName) throws ClientManagerException, TException { + try { + lock.writeLock().lock(); + + // 1. validate + ServiceInfo serviceInfo = serviceInfos.get(serviceName); + if (serviceInfo == null) { + TSStatus status = new TSStatus(TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode()); + status.setMessage( + String.format( + "Failed to stop External Service %s, because it is not existed!", serviceName)); + throw new ExternalServiceManagementException(status); + } + + // 2. call stop method of ServiceInstance + if (serviceInfo.getState() == STOPPED) { + return; + } else { + // The state is RUNNING + stopService(serviceInfo); + } + + // 3. persist on CN, rollback if failed + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID); ) { + TSStatus status = client.stopExternalService(QueryId.getDataNodeId(), serviceName); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + serviceInfo.getServiceInstance().start(); + throw new IoTDBRuntimeException(status.message, status.code); + } + } + + // 4. modify memory info + serviceInfo.setState(STOPPED); + } finally { + lock.writeLock().unlock(); + } + } + + private void stopService(ServiceInfo serviceInfo) { + checkState( + serviceInfo.getServiceInstance() != null, + "External Service instance is null when state is RUNNING!", + serviceInfo.getServiceName()); + serviceInfo.getServiceInstance().stop(); + } + + public void dropService(String serviceName, boolean forcedly) + throws ClientManagerException, TException { + try { + lock.writeLock().lock(); + + // 1. validate + ServiceInfo serviceInfo = serviceInfos.get(serviceName); + if (serviceInfo == null) { + TSStatus status = new TSStatus(TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode()); + status.setMessage( + String.format( + "Failed to drop External Service %s, because it is not existed!", serviceName)); + throw new ExternalServiceManagementException(status); + } + if (serviceInfo.getServiceType() == ServiceInfo.ServiceType.BUILTIN) { + TSStatus status = + new TSStatus(TSStatusCode.CANNOT_DROP_BUILTIN_EXTERNAL_SERVICE.getStatusCode()); + status.setMessage( + String.format( + "Failed to drop External Service %s, because it is BUILT-IN!", serviceName)); + throw new ExternalServiceManagementException(status); + } + + // 2. stop or fail when service are not stopped + if (serviceInfo.getState() == STOPPED) { + // do nothing + } else { + // The state is RUNNING + if (forcedly) { + try { + stopService(serviceInfo); + } catch (Exception e) { + // record errMsg if exception occurs during the stop of service + LOGGER.warn( + "Failed to stop External Service %s because %s. It will be drop forcedly", + serviceName, e.getMessage()); + } + } else { + TSStatus status = + new TSStatus(TSStatusCode.CANNOT_DROP_RUNNING_EXTERNAL_SERVICE.getStatusCode()); + status.setMessage( + String.format( + "Failed to drop External Service %s, because it is RUNNING!", serviceName)); + throw new ExternalServiceManagementException(status); + } + } + + // 3. persist on CN + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TSStatus status = client.dropExternalService(QueryId.getDataNodeId(), serviceName); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBRuntimeException(status.message, status.code); + } + } + + // 4. modify memory info + serviceInfos.remove(serviceName); + } finally { + lock.writeLock().unlock(); + } + } + + public void restoreRunningServiceInstance() { + // Needn't use lock here, we use this method when active DN and there is no concurrent risk + serviceInfos + .values() + .forEach( + serviceInfo -> { + // start services with RUNNING state + if (serviceInfo.getState() == RUNNING) { + IExternalService serviceInstance = serviceInfo.getServiceInstance(); + serviceInfo.setServiceInstance(serviceInstance); + serviceInstance.start(); Review Comment: Line 308 retrieves the serviceInstance which is potentially null, then line 309 sets it back to the same null value. This should call `createExternalServiceInstance` to create a new instance if null, similar to the pattern in `startService` method. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/externalservice/StartExternalServiceTask.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.externalservice; + +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Locale; + +public class StartExternalServiceTask implements IConfigTask { + + private final String serviceName; + + public StartExternalServiceTask(String serviceName) { + this.serviceName = serviceName.toUpperCase(Locale.ENGLISH); + ; Review Comment: Remove the extra semicolon on line 36. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java: ########## @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.service.exernalservice; Review Comment: Corrected package name from 'exernalservice' to 'externalservice'. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java: ########## @@ -1093,6 +1097,36 @@ public Statement visitShowTriggers(IoTDBSqlParser.ShowTriggersContext ctx) { return new ShowTriggersStatement(); } + @Override + public Statement visitCreateService(IoTDBSqlParser.CreateServiceContext ctx) { + throw new UnsupportedOperationException(SERVICE_MANAGEMENT_NOT_SUPPORTED); + } + + @Override + public Statement visitStartService(IoTDBSqlParser.StartServiceContext ctx) { + throw new UnsupportedOperationException(SERVICE_MANAGEMENT_NOT_SUPPORTED); + } + + @Override + public Statement visitStopService(IoTDBSqlParser.StopServiceContext ctx) { + throw new UnsupportedOperationException(SERVICE_MANAGEMENT_NOT_SUPPORTED); + } + + @Override + public Statement visitDropService(IoTDBSqlParser.DropServiceContext ctx) { + throw new UnsupportedOperationException(SERVICE_MANAGEMENT_NOT_SUPPORTED); + } + + @Override + public Statement visitShowService(IoTDBSqlParser.ShowServiceContext ctx) { + // show services on all DNs + int dataNodeId = -1; + if (ctx.ON() != null) { + dataNodeId = Integer.parseInt(ctx.targetDataNodeId.getText()); Review Comment: Potential uncaught 'java.lang.NumberFormatException'. ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/exernalservice/ShowExternalServicePlan.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.read.exernalservice; Review Comment: Corrected package name from 'exernalservice' to 'externalservice'. ```suggestion package org.apache.iotdb.confignode.consensus.request.read.externalservice; ``` ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/externalservice/ExternalServiceInfo.java: ########## @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.externalservice; + +import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.externalservice.ServiceInfo; +import org.apache.iotdb.commons.snapshot.SnapshotProcessor; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.confignode.consensus.request.write.externalservice.CreateExternalServicePlan; +import org.apache.iotdb.confignode.consensus.request.write.externalservice.DropExternalServicePlan; +import org.apache.iotdb.confignode.consensus.request.write.externalservice.StartExternalServicePlan; +import org.apache.iotdb.confignode.consensus.request.write.externalservice.StopExternalServicePlan; +import org.apache.iotdb.confignode.consensus.response.externalservice.ShowExternalServiceResp; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.zip.CRC32; + +public class ExternalServiceInfo implements SnapshotProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(ExternalServiceInfo.class); + + private final Map<Integer, Map<String, ServiceInfo>> datanodeToServiceInfos; + + private static final String SNAPSHOT_FILENAME = "service_info.bin"; + private static final byte SERIALIZATION_VERSION = 1; + private final CRC32 crc32 = new CRC32(); + + private static final String SERVICE_NOT_EXISTED = + "ExternalService %s is not existed on DataNode %s."; + + public ExternalServiceInfo() { + datanodeToServiceInfos = new ConcurrentHashMap<>(); + } + + /** + * Add a new ExternalService on target DataNode. + * + * @return SUCCESS_STATUS if <tt>this service</tt> was not existed on target DataNode, otherwise + * EXTERNAL_SERVICE_AlREADY_EXIST + */ + public TSStatus addService(CreateExternalServicePlan plan) { + TSStatus res = new TSStatus(); + Map<String, ServiceInfo> serviceInfos = + datanodeToServiceInfos.computeIfAbsent( + plan.getDatanodeId(), k -> new ConcurrentHashMap<>()); + String serviceName = plan.getServiceInfo().getServiceName(); + if (serviceInfos.containsKey(serviceName)) { + res.code = TSStatusCode.EXTERNAL_SERVICE_ALREADY_EXIST.getStatusCode(); + res.message = String.format(SERVICE_NOT_EXISTED, serviceName, plan.getDatanodeId()); + } else { + serviceInfos.put(serviceName, plan.getServiceInfo()); + res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); + } + return res; + } + + /** + * Drop the ExternalService whose name is same as <tt>serviceName</tt> in plan. + * + * @return SUCCESS_STATUS if <tt>this service</tt> was existed on target DataNode, otherwise + * NO_SUCH_EXTERNAL_SERVICE + */ + public TSStatus dropService(DropExternalServicePlan plan) { + TSStatus res = new TSStatus(); + Map<String, ServiceInfo> serviceInfos = + datanodeToServiceInfos.computeIfAbsent( + plan.getDataNodeId(), k -> new ConcurrentHashMap<>()); + String serviceName = plan.getServiceName(); + ServiceInfo removed = serviceInfos.remove(serviceName); + if (removed == null) { + res.code = TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode(); + res.message = String.format(SERVICE_NOT_EXISTED, serviceName, plan.getDataNodeId()); + } else { + res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); + } + return res; + } + + /** + * Start the ExternalService whose name is same as <tt>serviceName</tt> in plan. + * + * @return SUCCESS_STATUS if <tt>this service</tt> was existed on target DataNode, otherwise + * NO_SUCH_EXTERNAL_SERVICE + */ + public TSStatus startService(StartExternalServicePlan plan) { + TSStatus res = new TSStatus(); + Map<String, ServiceInfo> serviceInfos = + datanodeToServiceInfos.computeIfAbsent( + plan.getDataNodeId(), k -> new ConcurrentHashMap<>()); + String serviceName = plan.getServiceName(); + ServiceInfo serviceInfo = serviceInfos.get(serviceName); + if (serviceInfo == null) { + res.code = TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode(); + res.message = String.format(SERVICE_NOT_EXISTED, serviceName, plan.getDataNodeId()); + } else { + serviceInfo.setState(ServiceInfo.State.RUNNING); + res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); + } + return res; + } + + /** + * Stop the ExternalService whose name is same as <tt>serviceName</tt> in plan. + * + * @return SUCCESS_STATUS if <tt>this service</tt> was existed on target DataNode, otherwise + * NO_SUCH_EXTERNAL_SERVICE + */ + public TSStatus stopService(StopExternalServicePlan plan) { + TSStatus res = new TSStatus(); + Map<String, ServiceInfo> serviceInfos = + datanodeToServiceInfos.computeIfAbsent( + plan.getDataNodeId(), k -> new ConcurrentHashMap<>()); + String serviceName = plan.getServiceName(); + ServiceInfo serviceInfo = serviceInfos.get(serviceName); + if (serviceInfo == null) { + res.code = TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode(); + res.message = String.format(SERVICE_NOT_EXISTED, serviceName, plan.getDataNodeId()); + } else { + serviceInfo.setState(ServiceInfo.State.STOPPED); + res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); + } + return res; + } + + public ShowExternalServiceResp showService(Set<Integer> dataNodes) { + return new ShowExternalServiceResp( + datanodeToServiceInfos.entrySet().stream() + .filter(entry -> dataNodes.contains(entry.getKey())) + .flatMap( + entry -> + entry.getValue().values().stream() + .map( + serviceInfo -> + new TExternalServiceEntry( + serviceInfo.getServiceName(), + serviceInfo.getClassName(), + serviceInfo.getState().getValue(), + entry.getKey(), + ServiceInfo.ServiceType.USER_DEFINED.getValue()))) + .collect(Collectors.toList())); + } + + private void serializeInfos(OutputStream outputStream) throws IOException { + ReadWriteIOUtils.write(SERIALIZATION_VERSION, outputStream); + ReadWriteIOUtils.write(datanodeToServiceInfos.size(), outputStream); + for (Map.Entry<Integer, Map<String, ServiceInfo>> outerEntry : + datanodeToServiceInfos.entrySet()) { + ReadWriteIOUtils.write(outerEntry.getKey(), outputStream); // DataNode ID + + Map<String, ServiceInfo> innerMap = outerEntry.getValue(); + // inner Map + ReadWriteIOUtils.write(innerMap.size(), outputStream); + for (ServiceInfo innerEntry : innerMap.values()) { + serializeServiceInfoWithCRC(innerEntry, outputStream); + } + } + } + + private void serializeServiceInfoWithCRC(ServiceInfo serviceInfo, OutputStream outputStream) + throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream tempDos = new DataOutputStream(byteArrayOutputStream); + serviceInfo.serialize(tempDos); + tempDos.flush(); + byte[] bytes = byteArrayOutputStream.toByteArray(); + + crc32.reset(); + crc32.update(bytes, 0, bytes.length); + + ReadWriteIOUtils.write(bytes.length, outputStream); + outputStream.write(bytes); + ReadWriteIOUtils.write((int) crc32.getValue(), outputStream); + } + + private void deserializeInfos(InputStream inputStream) throws IOException { + if (ReadWriteIOUtils.readByte(inputStream) != SERIALIZATION_VERSION) { + throw new IOException("Incorrect version of " + SNAPSHOT_FILENAME); + } + + int outerSize = ReadWriteIOUtils.readInt(inputStream); + for (int i = 0; i < outerSize; i++) { + int dataNodeId = ReadWriteIOUtils.readInt(inputStream); + int innerSize = ReadWriteIOUtils.readInt(inputStream); + + Map<String, ServiceInfo> innerMap = + datanodeToServiceInfos.computeIfAbsent( + dataNodeId, k -> new ConcurrentHashMap<>(innerSize)); + for (int j = 0; j < innerSize; j++) { + ServiceInfo value = deserializeServiceInfoConsiderCRC(inputStream); + if (value != null) { + innerMap.put(value.getServiceName(), value); + } + } + datanodeToServiceInfos.put(dataNodeId, innerMap); + } + } + + private ServiceInfo deserializeServiceInfoConsiderCRC(InputStream inputStream) + throws IOException { + int length = ReadWriteIOUtils.readInt(inputStream); + byte[] bytes = new byte[length]; + inputStream.read(bytes); Review Comment: Method deserializeServiceInfoConsiderCRC ignores exceptional return value of InputStream.read. ```suggestion int offset = 0; while (offset < length) { int read = inputStream.read(bytes, offset, length - offset); if (read == -1) { throw new IOException("Unexpected end of stream while reading service info."); } offset += read; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
