liyuheng55555 commented on code in PR #13660:
URL: https://github.com/apache/iotdb/pull/13660#discussion_r1797561664
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java:
##########
@@ -93,299 +92,314 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
/** Asynchronously send RPC requests to DataNodes. See queryengine.thrift for
more details. */
public class CnToDnInternalServiceAsyncRequestManager
- extends DataNodeInternalServiceRequestManager<CnToDnRequestType> {
+ extends DataNodeInternalServiceRequestManager<CnToDnAsyncRequestType> {
private static final Logger LOGGER =
LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class);
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
- CnToDnRequestType.SET_TTL,
+ CnToDnAsyncRequestType.SET_TTL,
(req, client, handler) ->
client.setTTL((TSetTTLReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.CREATE_DATA_REGION,
+ CnToDnAsyncRequestType.CREATE_DATA_REGION,
(req, client, handler) ->
client.createDataRegion(
(TCreateDataRegionReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.DELETE_REGION,
+ CnToDnAsyncRequestType.DELETE_REGION,
(req, client, handler) ->
client.deleteRegion((TConsensusGroupId) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.CREATE_SCHEMA_REGION,
+ CnToDnAsyncRequestType.CREATE_SCHEMA_REGION,
(req, client, handler) ->
client.createSchemaRegion(
(TCreateSchemaRegionReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.CREATE_FUNCTION,
+ CnToDnAsyncRequestType.CREATE_FUNCTION,
(req, client, handler) ->
client.createFunction(
(TCreateFunctionInstanceReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.DROP_FUNCTION,
+ CnToDnAsyncRequestType.DROP_FUNCTION,
(req, client, handler) ->
client.dropFunction(
(TDropFunctionInstanceReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.CREATE_TRIGGER_INSTANCE,
+ CnToDnAsyncRequestType.CREATE_TRIGGER_INSTANCE,
(req, client, handler) ->
client.createTriggerInstance(
(TCreateTriggerInstanceReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.DROP_TRIGGER_INSTANCE,
+ CnToDnAsyncRequestType.DROP_TRIGGER_INSTANCE,
(req, client, handler) ->
client.dropTriggerInstance(
(TDropTriggerInstanceReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.ACTIVE_TRIGGER_INSTANCE,
+ CnToDnAsyncRequestType.ACTIVE_TRIGGER_INSTANCE,
(req, client, handler) ->
client.activeTriggerInstance(
(TActiveTriggerInstanceReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.INACTIVE_TRIGGER_INSTANCE,
+ CnToDnAsyncRequestType.INACTIVE_TRIGGER_INSTANCE,
(req, client, handler) ->
client.inactiveTriggerInstance(
(TInactiveTriggerInstanceReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.UPDATE_TRIGGER_LOCATION,
+ CnToDnAsyncRequestType.UPDATE_TRIGGER_LOCATION,
(req, client, handler) ->
client.updateTriggerLocation(
(TUpdateTriggerLocationReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.CREATE_PIPE_PLUGIN,
+ CnToDnAsyncRequestType.CREATE_PIPE_PLUGIN,
(req, client, handler) ->
client.createPipePlugin(
(TCreatePipePluginInstanceReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.DROP_PIPE_PLUGIN,
+ CnToDnAsyncRequestType.DROP_PIPE_PLUGIN,
(req, client, handler) ->
client.dropPipePlugin(
(TDropPipePluginInstanceReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.PIPE_PUSH_ALL_META,
+ CnToDnAsyncRequestType.PIPE_PUSH_ALL_META,
(req, client, handler) ->
client.pushPipeMeta((TPushPipeMetaReq) req,
(PipePushMetaRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.PIPE_PUSH_SINGLE_META,
+ CnToDnAsyncRequestType.PIPE_PUSH_SINGLE_META,
(req, client, handler) ->
client.pushSinglePipeMeta(
(TPushSinglePipeMetaReq) req, (PipePushMetaRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.PIPE_PUSH_MULTI_META,
+ CnToDnAsyncRequestType.PIPE_PUSH_MULTI_META,
(req, client, handler) ->
client.pushMultiPipeMeta(
(TPushMultiPipeMetaReq) req, (PipePushMetaRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.TOPIC_PUSH_ALL_META,
+ CnToDnAsyncRequestType.TOPIC_PUSH_ALL_META,
(req, client, handler) ->
client.pushTopicMeta((TPushTopicMetaReq) req,
(TopicPushMetaRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.TOPIC_PUSH_SINGLE_META,
+ CnToDnAsyncRequestType.TOPIC_PUSH_SINGLE_META,
(req, client, handler) ->
client.pushSingleTopicMeta(
(TPushSingleTopicMetaReq) req, (TopicPushMetaRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.TOPIC_PUSH_MULTI_META,
+ CnToDnAsyncRequestType.TOPIC_PUSH_MULTI_META,
(req, client, handler) ->
client.pushMultiTopicMeta(
(TPushMultiTopicMetaReq) req, (TopicPushMetaRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.CONSUMER_GROUP_PUSH_ALL_META,
+ CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_ALL_META,
(req, client, handler) ->
client.pushConsumerGroupMeta(
(TPushConsumerGroupMetaReq) req,
(ConsumerGroupPushMetaRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
+ CnToDnAsyncRequestType.CONSUMER_GROUP_PUSH_SINGLE_META,
(req, client, handler) ->
client.pushSingleConsumerGroupMeta(
(TPushSingleConsumerGroupMetaReq) req,
(ConsumerGroupPushMetaRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.PIPE_HEARTBEAT,
+ CnToDnAsyncRequestType.PIPE_HEARTBEAT,
(req, client, handler) ->
client.pipeHeartbeat((TPipeHeartbeatReq) req,
(PipeHeartbeatRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.MERGE,
+ CnToDnAsyncRequestType.MERGE,
(req, client, handler) -> client.merge((DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.FULL_MERGE,
+ CnToDnAsyncRequestType.FULL_MERGE,
(req, client, handler) -> client.merge((DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.FLUSH,
+ CnToDnAsyncRequestType.FLUSH,
(req, client, handler) ->
client.flush((TFlushReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.CLEAR_CACHE,
+ CnToDnAsyncRequestType.CLEAR_CACHE,
(req, client, handler) ->
client.clearCache((DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.START_REPAIR_DATA,
+ CnToDnAsyncRequestType.START_REPAIR_DATA,
(req, client, handler) ->
client.startRepairData((DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.STOP_REPAIR_DATA,
+ CnToDnAsyncRequestType.STOP_REPAIR_DATA,
(req, client, handler) ->
client.stopRepairData((DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.LOAD_CONFIGURATION,
+ CnToDnAsyncRequestType.LOAD_CONFIGURATION,
(req, client, handler) ->
client.loadConfiguration((DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.SET_SYSTEM_STATUS,
+ CnToDnAsyncRequestType.SET_SYSTEM_STATUS,
(req, client, handler) ->
client.setSystemStatus((String) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.SET_CONFIGURATION,
+ CnToDnAsyncRequestType.SET_CONFIGURATION,
(req, client, handler) ->
client.setConfiguration(
(TSetConfigurationReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.UPDATE_REGION_ROUTE_MAP,
+ CnToDnAsyncRequestType.UPDATE_REGION_ROUTE_MAP,
(req, client, handler) ->
client.updateRegionCache((TRegionRouteReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.CHANGE_REGION_LEADER,
+ CnToDnAsyncRequestType.CHANGE_REGION_LEADER,
(req, client, handler) ->
client.changeRegionLeader(
(TRegionLeaderChangeReq) req, (TransferLeaderRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.CONSTRUCT_SCHEMA_BLACK_LIST,
+ CnToDnAsyncRequestType.CONSTRUCT_SCHEMA_BLACK_LIST,
(req, client, handler) ->
client.constructSchemaBlackList(
(TConstructSchemaBlackListReq) req, (SchemaUpdateRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.ROLLBACK_SCHEMA_BLACK_LIST,
+ CnToDnAsyncRequestType.ROLLBACK_SCHEMA_BLACK_LIST,
(req, client, handler) ->
client.rollbackSchemaBlackList(
(TRollbackSchemaBlackListReq) req, (SchemaUpdateRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.FETCH_SCHEMA_BLACK_LIST,
+ CnToDnAsyncRequestType.FETCH_SCHEMA_BLACK_LIST,
(req, client, handler) ->
client.fetchSchemaBlackList(
(TFetchSchemaBlackListReq) req,
(FetchSchemaBlackListRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.INVALIDATE_SCHEMA_CACHE,
+ CnToDnAsyncRequestType.INVALIDATE_SCHEMA_CACHE,
(req, client, handler) ->
client.invalidateSchemaCache(
(TInvalidateCacheReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE,
+ CnToDnAsyncRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE,
(req, client, handler) ->
client.invalidateMatchedSchemaCache(
(TInvalidateMatchedSchemaCacheReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.DELETE_DATA_FOR_DELETE_SCHEMA,
+ CnToDnAsyncRequestType.DELETE_DATA_FOR_DELETE_SCHEMA,
(req, client, handler) ->
client.deleteDataForDeleteSchema(
(TDeleteDataForDeleteSchemaReq) req, (SchemaUpdateRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.DELETE_TIMESERIES,
+ CnToDnAsyncRequestType.DELETE_TIMESERIES,
(req, client, handler) ->
client.deleteTimeSeries((TDeleteTimeSeriesReq) req,
(SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
+ CnToDnAsyncRequestType.CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
(req, client, handler) ->
client.constructSchemaBlackListWithTemplate(
(TConstructSchemaBlackListWithTemplateReq) req,
(SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
+ CnToDnAsyncRequestType.ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
(req, client, handler) ->
client.rollbackSchemaBlackListWithTemplate(
(TRollbackSchemaBlackListWithTemplateReq) req,
(SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.DEACTIVATE_TEMPLATE,
+ CnToDnAsyncRequestType.DEACTIVATE_TEMPLATE,
(req, client, handler) ->
client.deactivateTemplate(
(TDeactivateTemplateReq) req, (SchemaUpdateRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.UPDATE_TEMPLATE,
+ CnToDnAsyncRequestType.UPDATE_TEMPLATE,
(req, client, handler) ->
client.updateTemplate((TUpdateTemplateReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.COUNT_PATHS_USING_TEMPLATE,
+ CnToDnAsyncRequestType.COUNT_PATHS_USING_TEMPLATE,
(req, client, handler) ->
client.countPathsUsingTemplate(
(TCountPathsUsingTemplateReq) req,
(CountPathsUsingTemplateRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.CHECK_SCHEMA_REGION_USING_TEMPLATE,
+ CnToDnAsyncRequestType.CHECK_SCHEMA_REGION_USING_TEMPLATE,
(req, client, handler) ->
client.checkSchemaRegionUsingTemplate(
(TCheckSchemaRegionUsingTemplateReq) req,
(CheckSchemaRegionUsingTemplateRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.CHECK_TIMESERIES_EXISTENCE,
+ CnToDnAsyncRequestType.CHECK_TIMESERIES_EXISTENCE,
(req, client, handler) ->
client.checkTimeSeriesExistence(
(TCheckTimeSeriesExistenceReq) req,
(CheckTimeSeriesExistenceRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.CONSTRUCT_VIEW_SCHEMA_BLACK_LIST,
+ CnToDnAsyncRequestType.CONSTRUCT_VIEW_SCHEMA_BLACK_LIST,
(req, client, handler) ->
client.constructViewSchemaBlackList(
(TConstructViewSchemaBlackListReq) req,
(SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.ROLLBACK_VIEW_SCHEMA_BLACK_LIST,
+ CnToDnAsyncRequestType.ROLLBACK_VIEW_SCHEMA_BLACK_LIST,
(req, client, handler) ->
client.rollbackViewSchemaBlackList(
(TRollbackViewSchemaBlackListReq) req,
(SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.DELETE_VIEW,
+ CnToDnAsyncRequestType.DELETE_VIEW,
(req, client, handler) ->
client.deleteViewSchema((TDeleteViewSchemaReq) req,
(SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.ALTER_VIEW,
+ CnToDnAsyncRequestType.ALTER_VIEW,
(req, client, handler) ->
client.alterView((TAlterViewReq) req, (SchemaUpdateRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.KILL_QUERY_INSTANCE,
+ CnToDnAsyncRequestType.KILL_QUERY_INSTANCE,
(req, client, handler) ->
client.killQueryInstance((String) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.SET_SPACE_QUOTA,
+ CnToDnAsyncRequestType.SET_SPACE_QUOTA,
(req, client, handler) ->
client.setSpaceQuota((TSetSpaceQuotaReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.SET_THROTTLE_QUOTA,
+ CnToDnAsyncRequestType.SET_THROTTLE_QUOTA,
(req, client, handler) ->
client.setThrottleQuota(
(TSetThrottleQuotaReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.RESET_PEER_LIST,
+ CnToDnAsyncRequestType.RESET_PEER_LIST,
(req, client, handler) ->
client.resetPeerList((TResetPeerListReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.SUBMIT_TEST_CONNECTION_TASK,
+ CnToDnAsyncRequestType.SUBMIT_TEST_CONNECTION_TASK,
(req, client, handler) ->
client.submitTestConnectionTask(
(TNodeLocations) req, (SubmitTestConnectionTaskRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.TEST_CONNECTION,
+ CnToDnAsyncRequestType.TEST_CONNECTION,
(req, client, handler) ->
client.testConnectionEmptyRPC((DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.UPDATE_TABLE,
+ CnToDnAsyncRequestType.UPDATE_TABLE,
(req, client, handler) ->
client.updateTable((TUpdateTableReq) req,
(DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
- CnToDnRequestType.CLEAN_DATA_NODE_CACHE,
+ CnToDnAsyncRequestType.CLEAN_DATA_NODE_CACHE,
(req, client, handler) ->
client.cleanDataNodeCache(
(TCleanDataNodeCacheReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
- CnToDnRequestType.STOP_DATA_NODE,
+ CnToDnAsyncRequestType.STOP_DATA_NODE,
(req, client, handler) ->
client.stopDataNode((DataNodeTSStatusRPCHandler) handler));
}
@Override
- protected AsyncRequestRPCHandler<?, CnToDnRequestType, TDataNodeLocation>
buildHandler(
- AsyncRequestContext<?, ?, CnToDnRequestType, TDataNodeLocation>
requestContext,
+ protected void checkActionMapCompleteness() {
+ List<CnToDnAsyncRequestType> lackList =
+ Arrays.stream(CnToDnAsyncRequestType.values())
+ .filter(type -> !actionMap.containsKey(type))
+ .collect(Collectors.toList());
+ if (!lackList.isEmpty()) {
+ LOGGER.error("These request types should be added to actionMap: {}",
lackList);
Review Comment:
Throw an UncheckedStartupException
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]