JackieTien97 commented on code in PR #17009:
URL: https://github.com/apache/iotdb/pull/17009#discussion_r2681221266
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java:
##########
@@ -546,6 +553,14 @@ private ColumnHeaderConstant() {
new ColumnHeader(CLASS_NAME, TSDataType.TEXT),
new ColumnHeader(NODE_ID, TSDataType.TEXT));
+ public static final List<ColumnHeader> showExternalServiceColumnHeaders =
+ ImmutableList.of(
+ new ColumnHeader(SERVICE_NAME, TSDataType.TEXT),
+ new ColumnHeader(DATA_NODE_ID, TSDataType.INT32),
+ new ColumnHeader(STATE, TSDataType.TEXT),
+ new ColumnHeader(CLASS_NAME, TSDataType.TEXT),
+ new ColumnHeader(SERVICE_TYPE, TSDataType.TEXT));
Review Comment:
```suggestion
new ColumnHeader(SERVICE_NAME, TSDataType.STRING),
new ColumnHeader(DATA_NODE_ID, TSDataType.INT32),
new ColumnHeader(STATE, TSDataType.STRING),
new ColumnHeader(CLASS_NAME, TSDataType.STRING),
new ColumnHeader(SERVICE_TYPE, TSDataType.STRING));
```
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java:
##########
@@ -26,6 +26,7 @@ public enum ServiceType {
RPC_SERVICE("RPC ServerService", "RPCService"),
INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"),
MQTT_SERVICE("MQTTService", "MqttService"),
+ EXTERNAL_SERVICE("ExternalService Service", "ExternalService"),
Review Comment:
It seems that this enum is never used.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java:
##########
@@ -936,6 +941,38 @@ private TSStatus checkTriggerManagement(IAuditEntity
auditEntity, Supplier<Strin
return checkGlobalAuth(auditEntity, PrivilegeType.USE_TRIGGER,
auditObject);
}
+ // ======================= externalService related
================================
+ @Override
+ public TSStatus visitCreateExternalService(
+ CreateExternalServiceStatement createExternalServiceStatement,
+ TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitStartExternalService(
+ StartExternalServiceStatement startExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitStopExternalService(
+ StopExternalServiceStatement stopExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitDropExternalService(
+ DropExternalServiceStatement dropExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java:
##########
@@ -936,6 +941,38 @@ private TSStatus checkTriggerManagement(IAuditEntity
auditEntity, Supplier<Strin
return checkGlobalAuth(auditEntity, PrivilegeType.USE_TRIGGER,
auditObject);
}
+ // ======================= externalService related
================================
+ @Override
+ public TSStatus visitCreateExternalService(
+ CreateExternalServiceStatement createExternalServiceStatement,
+ TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitStartExternalService(
+ StartExternalServiceStatement startExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitStopExternalService(
+ StopExternalServiceStatement stopExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitDropExternalService(
+ DropExternalServiceStatement dropExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitShowExternalService(
+ ShowExternalServiceStatement showExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new
ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(serviceName, true,
classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ String errorMessage =
+ String.format(
+ "Failed to start External Service %s, because its instance can
not be constructed successfully. Exception: %s",
+ serviceName, e);
+ LOGGER.warn(errorMessage, e);
+ throw new ExternalServiceManagementException(errorMessage);
Review Comment:
add TsStatusCode
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new
ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(serviceName, true,
classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ String errorMessage =
+ String.format(
+ "Failed to start External Service %s, because its instance can
not be constructed successfully. Exception: %s",
+ serviceName, e);
+ LOGGER.warn(errorMessage, e);
+ throw new ExternalServiceManagementException(errorMessage);
+ }
+ }
+
+ public void stopService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call stop method of ServiceInstance
+ if (serviceInfo.getState() == STOPPED) {
+ return;
+ } else {
+ // The state is RUNNING
+ stopService(serviceInfo);
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.stopExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().start();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(STOPPED);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void stopService(ServiceInfo serviceInfo) {
+ checkState(
+ serviceInfo.getServiceInstance() != null,
+ "External Service instance is null when state is RUNNING!",
+ serviceInfo.getServiceName());
+ serviceInfo.getServiceInstance().stop();
+ }
+
+ public void dropService(String serviceName, boolean forcedly)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is not
existed!", serviceName));
+ }
+ if (serviceInfo.getServiceType() == ServiceInfo.ServiceType.BUILTIN) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is BUILT-IN!",
serviceName));
+ }
+
+ // 2. stop or fail when service are not stopped
+ if (serviceInfo.getState() == STOPPED) {
+ // do nothing
+ } else {
+ // The state is RUNNING
+ if (forcedly) {
+ try {
+ stopService(serviceInfo);
+ } catch (Exception e) {
+ // record errMsg if exception occurs during the stop of service
+ LOGGER.warn(
+ "Failed to stop External Service %s because %s. It will be
drop forcedly",
+ serviceName, e.getMessage());
+ }
+ } else {
+ throw new ExternalServiceManagementException(
Review Comment:
dd TsStatusCode
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -4839,4 +4849,82 @@ public void handlePipeConfigClientExit(final String
clientId) {
LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
}
}
+
+ @Override
+ public SettableFuture<ConfigTaskResult> createExternalService(
+ String serviceName, String className) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+
ExternalServiceManagementService.getInstance().createService(serviceName,
className);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> startExternalService(String
serviceName) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().startService(serviceName);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> stopExternalService(String
serviceName) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().stopService(serviceName);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> dropExternalService(
+ String serviceName, boolean forcedly) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().dropService(serviceName,
forcedly);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> showExternalService(int dataNodeId) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ Iterator<TExternalServiceEntry> iterator =
+
ExternalServiceManagementService.getInstance().showService(dataNodeId);
+
+ List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.showExternalServiceColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ while (iterator.hasNext()) {
+ timeColumnBuilder.writeLong(0L);
+ appendServiceEntry(iterator.next(), builder.getValueColumnBuilders());
+ builder.declarePosition();
+ }
+
+ future.set(
+ new ConfigTaskResult(
+ TSStatusCode.SUCCESS_STATUS, builder.build(),
getShowExternalServiceHeader()));
+ } catch (Exception e) {
+ future.setException(e);
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/BuiltinExternalServices.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
+
+import java.util.function.Supplier;
+
+public enum BuiltinExternalServices {
+ MQTT(
+ "MQTT",
+ "org.apache.iotdb.externalservice.Mqtt",
+ IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService),
+ REST(
+ "REST",
+ "org.apache.iotdb.externalservice.Rest",
+
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService);
Review Comment:
```suggestion
MQTT(
"MQTT",
"org.apache.iotdb.externalservice.Mqtt",
// IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService),
() -> false),
REST(
"REST",
"org.apache.iotdb.externalservice.Rest",
//
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService);
() -> false);
```
better always return false before you really seperate the rest and mqtt
service outside. Otherwise, mqtt and rest ITs will fail because
`org.apache.iotdb.externalservice.Mqtt` and
`org.apache.iotdb.externalservice.Rest` is not found
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -4839,4 +4849,82 @@ public void handlePipeConfigClientExit(final String
clientId) {
LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
}
}
+
+ @Override
+ public SettableFuture<ConfigTaskResult> createExternalService(
+ String serviceName, String className) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+
ExternalServiceManagementService.getInstance().createService(serviceName,
className);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> startExternalService(String
serviceName) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().startService(serviceName);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> stopExternalService(String
serviceName) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().stopService(serviceName);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> dropExternalService(
+ String serviceName, boolean forcedly) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().dropService(serviceName,
forcedly);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java:
##########
@@ -936,6 +941,38 @@ private TSStatus checkTriggerManagement(IAuditEntity
auditEntity, Supplier<Strin
return checkGlobalAuth(auditEntity, PrivilegeType.USE_TRIGGER,
auditObject);
}
+ // ======================= externalService related
================================
+ @Override
+ public TSStatus visitCreateExternalService(
+ CreateExternalServiceStatement createExternalServiceStatement,
+ TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitStartExternalService(
+ StartExternalServiceStatement startExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java:
##########
@@ -1087,6 +1092,52 @@ public Node
visitShowFunctionsStatement(RelationalSqlParser.ShowFunctionsStateme
return new ShowFunctions();
}
+ @Override
+ public Node
visitCreateServiceStatement(RelationalSqlParser.CreateServiceStatementContext
ctx) {
+ String serviceName = ((Identifier) visit(ctx.serviceName)).getValue();
+ String className = ((StringLiteral) visit(ctx.className)).getValue();
+ return new CreateExternalService(getLocation(ctx), serviceName, className);
+ }
+
+ @Override
+ public Node
visitStartServiceStatement(RelationalSqlParser.StartServiceStatementContext
ctx) {
+ return new StartExternalService(
+ getLocation(ctx), ((Identifier) visit(ctx.serviceName)).getValue());
+ }
+
+ @Override
+ public Node
visitStopServiceStatement(RelationalSqlParser.StopServiceStatementContext ctx) {
+ return new StopExternalService(
+ getLocation(ctx), ((Identifier) visit(ctx.serviceName)).getValue());
+ }
+
+ @Override
+ public Node
visitDropServiceStatement(RelationalSqlParser.DropServiceStatementContext ctx) {
+ return new DropExternalService(
+ getLocation(ctx), ((Identifier) visit(ctx.serviceName)).getValue(),
ctx.FORCEDLY() != null);
+ }
+
+ @Override
+ public Node
visitShowServiceStatement(RelationalSqlParser.ShowServiceStatementContext ctx) {
+ // show services on all DNs
+ Optional<Expression> where = Optional.empty();
+ if (ctx.ON() != null) {
+ where =
+ Optional.of(
+ new ComparisonExpression(
+ ComparisonExpression.Operator.EQUAL,
+ new Identifier("datanode_id"),
Review Comment:
```suggestion
new
Identifier(ColumnHeaderConstant.DATA_NODE_ID_TABLE_MODEL),
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java:
##########
@@ -1287,6 +1287,7 @@ private void initProtocols() throws StartupException {
if
(IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
registerManager.register(RestService.getInstance());
}
+ //
registerManager.register(ExternalServiceManagementService.getInstance());
Review Comment:
need this? or should be just deleted.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
Review Comment:
use try-with-resource to close the client
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java:
##########
@@ -936,6 +941,38 @@ private TSStatus checkTriggerManagement(IAuditEntity
auditEntity, Supplier<Strin
return checkGlobalAuth(auditEntity, PrivilegeType.USE_TRIGGER,
auditObject);
}
+ // ======================= externalService related
================================
+ @Override
+ public TSStatus visitCreateExternalService(
+ CreateExternalServiceStatement createExternalServiceStatement,
+ TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
Review Comment:
() -> "" should be replaced with exact sql which can derived from
`CreateExternalServiceStatement`.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java:
##########
@@ -1511,6 +1519,37 @@ protected IConfigTask visitDropFunction(DropFunction
node, MPPQueryContext conte
return new DropFunctionTask(Model.TABLE, node.getUdfName());
}
+ @Override
+ protected IConfigTask visitCreateExternalService(
+ CreateExternalService node, MPPQueryContext context) {
+ context.setQueryType(QueryType.WRITE);
Review Comment:
```suggestion
context.setQueryType(QueryType.WRITE);
accessControl.checkUserGlobalSysPrivilege(context);
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java:
##########
@@ -936,6 +941,38 @@ private TSStatus checkTriggerManagement(IAuditEntity
auditEntity, Supplier<Strin
return checkGlobalAuth(auditEntity, PrivilegeType.USE_TRIGGER,
auditObject);
}
+ // ======================= externalService related
================================
+ @Override
+ public TSStatus visitCreateExternalService(
+ CreateExternalServiceStatement createExternalServiceStatement,
+ TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitStartExternalService(
+ StartExternalServiceStatement startExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
+ }
+
+ @Override
+ public TSStatus visitStopExternalService(
+ StopExternalServiceStatement stopExternalServiceStatement,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(context, PrivilegeType.SYSTEM, () -> "");
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
Review Comment:
add status code
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementException.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+public class ExternalServiceManagementException extends RuntimeException {
Review Comment:
```suggestion
public class ExternalServiceManagementException extends
IoTDBRuntimeException {
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new
ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(serviceName, true,
classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ String errorMessage =
+ String.format(
+ "Failed to start External Service %s, because its instance can
not be constructed successfully. Exception: %s",
+ serviceName, e);
+ LOGGER.warn(errorMessage, e);
+ throw new ExternalServiceManagementException(errorMessage);
+ }
+ }
+
+ public void stopService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call stop method of ServiceInstance
+ if (serviceInfo.getState() == STOPPED) {
+ return;
+ } else {
+ // The state is RUNNING
+ stopService(serviceInfo);
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.stopExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().start();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(STOPPED);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void stopService(ServiceInfo serviceInfo) {
+ checkState(
+ serviceInfo.getServiceInstance() != null,
+ "External Service instance is null when state is RUNNING!",
+ serviceInfo.getServiceName());
+ serviceInfo.getServiceInstance().stop();
+ }
+
+ public void dropService(String serviceName, boolean forcedly)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
Review Comment:
add TsStatusCode
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
Review Comment:
use try-with-resource to close the client
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new
ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(serviceName, true,
classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ String errorMessage =
+ String.format(
+ "Failed to start External Service %s, because its instance can
not be constructed successfully. Exception: %s",
+ serviceName, e);
+ LOGGER.warn(errorMessage, e);
+ throw new ExternalServiceManagementException(errorMessage);
+ }
+ }
+
+ public void stopService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
Review Comment:
add TsStatusCode
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
Review Comment:
close it
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
Review Comment:
```suggestion
} else { // The state is STOPPED
if (serviceInfo.getServiceInstance() == null) {
// lazy create Instance
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
}
serviceInfo.getServiceInstance().start();
}
```
After creating the instance, you still need to create it.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new
ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(serviceName, true,
classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ String errorMessage =
+ String.format(
+ "Failed to start External Service %s, because its instance can
not be constructed successfully. Exception: %s",
+ serviceName, e);
+ LOGGER.warn(errorMessage, e);
+ throw new ExternalServiceManagementException(errorMessage);
+ }
+ }
+
+ public void stopService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call stop method of ServiceInstance
+ if (serviceInfo.getState() == STOPPED) {
+ return;
+ } else {
+ // The state is RUNNING
+ stopService(serviceInfo);
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.stopExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().start();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(STOPPED);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void stopService(ServiceInfo serviceInfo) {
+ checkState(
+ serviceInfo.getServiceInstance() != null,
+ "External Service instance is null when state is RUNNING!",
+ serviceInfo.getServiceName());
+ serviceInfo.getServiceInstance().stop();
+ }
+
+ public void dropService(String serviceName, boolean forcedly)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is not
existed!", serviceName));
+ }
+ if (serviceInfo.getServiceType() == ServiceInfo.ServiceType.BUILTIN) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is BUILT-IN!",
serviceName));
+ }
+
+ // 2. stop or fail when service are not stopped
+ if (serviceInfo.getState() == STOPPED) {
+ // do nothing
+ } else {
+ // The state is RUNNING
+ if (forcedly) {
+ try {
+ stopService(serviceInfo);
+ } catch (Exception e) {
+ // record errMsg if exception occurs during the stop of service
+ LOGGER.warn(
+ "Failed to stop External Service %s because %s. It will be
drop forcedly",
+ serviceName, e.getMessage());
+ }
+ } else {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is
RUNNING!", serviceName));
+ }
+ }
+
+ // 3. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.dropExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfos.remove(serviceName);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void restoreRunningServices() {
Review Comment:
not called?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new
ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(serviceName, true,
classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ String errorMessage =
+ String.format(
+ "Failed to start External Service %s, because its instance can
not be constructed successfully. Exception: %s",
+ serviceName, e);
+ LOGGER.warn(errorMessage, e);
+ throw new ExternalServiceManagementException(errorMessage);
+ }
+ }
+
+ public void stopService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call stop method of ServiceInstance
+ if (serviceInfo.getState() == STOPPED) {
+ return;
+ } else {
+ // The state is RUNNING
+ stopService(serviceInfo);
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.stopExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().start();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(STOPPED);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void stopService(ServiceInfo serviceInfo) {
+ checkState(
+ serviceInfo.getServiceInstance() != null,
+ "External Service instance is null when state is RUNNING!",
+ serviceInfo.getServiceName());
+ serviceInfo.getServiceInstance().stop();
+ }
+
+ public void dropService(String serviceName, boolean forcedly)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is not
existed!", serviceName));
+ }
+ if (serviceInfo.getServiceType() == ServiceInfo.ServiceType.BUILTIN) {
+ throw new ExternalServiceManagementException(
Review Comment:
Add TsStatusCode
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new
ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(serviceName, true,
classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ String errorMessage =
+ String.format(
+ "Failed to start External Service %s, because its instance can
not be constructed successfully. Exception: %s",
+ serviceName, e);
+ LOGGER.warn(errorMessage, e);
+ throw new ExternalServiceManagementException(errorMessage);
+ }
+ }
+
+ public void stopService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call stop method of ServiceInstance
+ if (serviceInfo.getState() == STOPPED) {
+ return;
+ } else {
+ // The state is RUNNING
+ stopService(serviceInfo);
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.stopExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().start();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(STOPPED);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void stopService(ServiceInfo serviceInfo) {
+ checkState(
+ serviceInfo.getServiceInstance() != null,
+ "External Service instance is null when state is RUNNING!",
+ serviceInfo.getServiceName());
+ serviceInfo.getServiceInstance().stop();
+ }
+
+ public void dropService(String serviceName, boolean forcedly)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is not
existed!", serviceName));
+ }
+ if (serviceInfo.getServiceType() == ServiceInfo.ServiceType.BUILTIN) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is BUILT-IN!",
serviceName));
+ }
+
+ // 2. stop or fail when service are not stopped
+ if (serviceInfo.getState() == STOPPED) {
+ // do nothing
+ } else {
+ // The state is RUNNING
+ if (forcedly) {
+ try {
+ stopService(serviceInfo);
+ } catch (Exception e) {
+ // record errMsg if exception occurs during the stop of service
+ LOGGER.warn(
+ "Failed to stop External Service %s because %s. It will be
drop forcedly",
+ serviceName, e.getMessage());
+ }
+ } else {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to drop External Service %s, because it is
RUNNING!", serviceName));
+ }
+ }
+
+ // 3. persist on CN
+ ConfigNodeClient client =
Review Comment:
close it.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
+ String.format("Failed to create External Service %s, it already
exists!", serviceName));
+ }
+
+ // 2. persist on CN
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(),
serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className,
ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not
created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() != null) {
+ serviceInfo.getServiceInstance().start();
+ } else {
+ // lazy create Instance
+
serviceInfo.setServiceInstance(createExternalServiceInstance(serviceName));
+ }
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(),
serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new
ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(serviceName, true,
classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ String errorMessage =
+ String.format(
+ "Failed to start External Service %s, because its instance can
not be constructed successfully. Exception: %s",
+ serviceName, e);
+ LOGGER.warn(errorMessage, e);
+ throw new ExternalServiceManagementException(errorMessage);
+ }
+ }
+
+ public void stopService(String serviceName) throws ClientManagerException,
TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ throw new ExternalServiceManagementException(
+ String.format(
+ "Failed to start External Service %s, because it is not
existed!", serviceName));
+ }
+
+ // 2. call stop method of ServiceInstance
+ if (serviceInfo.getState() == STOPPED) {
+ return;
+ } else {
+ // The state is RUNNING
+ stopService(serviceInfo);
+ }
+
+ // 3. persist on CN, rollback if failed
+ ConfigNodeClient client =
Review Comment:
close it.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -4839,4 +4849,82 @@ public void handlePipeConfigClientExit(final String
clientId) {
LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
}
}
+
+ @Override
+ public SettableFuture<ConfigTaskResult> createExternalService(
+ String serviceName, String className) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+
ExternalServiceManagementService.getInstance().createService(serviceName,
className);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> startExternalService(String
serviceName) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().startService(serviceName);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -4839,4 +4849,82 @@ public void handlePipeConfigClientExit(final String
clientId) {
LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
}
}
+
+ @Override
+ public SettableFuture<ConfigTaskResult> createExternalService(
+ String serviceName, String className) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+
ExternalServiceManagementService.getInstance().createService(serviceName,
className);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
Review Comment:
```suggestion
} catch (IoTDBRuntimeException e) {
future.setException(e);
} catch (Exception e) {
future.setException(new ExternalServiceManagementException(301?,
e.getMessage()));
}
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/ExternalServiceManagementService.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static
org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map<String, ServiceInfo> serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator<TExternalServiceEntry> showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ ConfigNodeClient client =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message,
resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ throw new ExternalServiceManagementException(
Review Comment:
let ExternalServiceManagementException extend IoTDBRuntimeException
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/exernalservice/BuiltinExternalServices.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.exernalservice;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
+
+import java.util.function.Supplier;
+
+public enum BuiltinExternalServices {
+ MQTT(
+ "MQTT",
+ "org.apache.iotdb.externalservice.Mqtt",
+ IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService),
+ REST(
+ "REST",
+ "org.apache.iotdb.externalservice.Rest",
+
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService);
Review Comment:
```suggestion
MQTT(
"MQTT",
"org.apache.iotdb.externalservice.Mqtt",
// IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService),
() -> false),
REST(
"REST",
"org.apache.iotdb.externalservice.Rest",
//
IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService);
() -> false);
```
better always return false before you really seperate the rest and mqtt
service outside. Otherwise, mqtt and rest ITs will fail because
`org.apache.iotdb.externalservice.Mqtt` and
`org.apache.iotdb.externalservice.Rest` is not found
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -4839,4 +4849,82 @@ public void handlePipeConfigClientExit(final String
clientId) {
LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
}
}
+
+ @Override
+ public SettableFuture<ConfigTaskResult> createExternalService(
+ String serviceName, String className) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+
ExternalServiceManagementService.getInstance().createService(serviceName,
className);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> startExternalService(String
serviceName) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().startService(serviceName);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> stopExternalService(String
serviceName) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ ExternalServiceManagementService.getInstance().stopService(serviceName);
+ future.set(new ConfigTaskResult(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ } catch (Exception e) {
+ future.setException(new
ExternalServiceManagementException(e.getMessage()));
Review Comment:
same as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]