Copilot commented on code in PR #16938:
URL: https://github.com/apache/iotdb/pull/16938#discussion_r2638503121
##########
iotdb-core/ainode/iotdb/ainode/core/model/model_storage.py:
##########
@@ -287,29 +293,34 @@ def register_model(self, model_id: str, uri: str):
state=ModelStates.ACTIVE,
pipeline_cls=pipeline_cls,
auto_map=auto_map,
- _transformers_registered=False, # Register later
+ transformers_registered=False, # Register later
)
self._models[ModelCategory.USER_DEFINED.value][model_id] =
model_info
- if auto_map:
- # Transformers model: immediately register to Transformers
autoloading mechanism
- success = self._register_transformers_model(model_info)
- if success:
- with self._lock_pool.get_lock(model_id).write_lock():
- model_info._transformers_registered = True
- else:
- with self._lock_pool.get_lock(model_id).write_lock():
+ if auto_map:
+ # Transformers model: immediately register to Transformers
autoloading mechanism
+ try:
+ if self._register_transformers_model(model_info):
+ model_info.transformers_registered = True
+ except Exception as e:
model_info.state = ModelStates.INACTIVE
- logger.error(f"Failed to register Transformers model
{model_id}")
- else:
- # Other type models: only log
- self._register_other_model(model_info)
+ logger.error(
+ f"Failed to register Transformers model {model_id},
because {e}"
+ )
+ raise e
Review Comment:
The model registration logic modifies model_info.state and
model_info.transformers_registered (lines 304, 306) while inside the write lock
(acquired at line 288). However, the lock scope ends at line 313, but the final
logger.info at line 315 is outside the lock. If an exception occurs in
_register_transformers_model (line 303), the exception is re-raised at line
310, which means the model_info has already been stored in the dictionary (line
298) with an INACTIVE state. Consider whether the model should be removed from
the dictionary or if this behavior is intentional.
```suggestion
# Remove the model from the registry on registration
failure
self._models[ModelCategory.USER_DEFINED.value].pop(model_id, None)
logger.error(
f"Failed to register Transformers model {model_id},
because {e}"
)
raise
```
##########
integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeModelManageIT.java:
##########
@@ -54,54 +58,68 @@ public class AINodeModelManageIT {
public static void setUp() throws Exception {
// Init 1C1D1A cluster environment
EnvFactory.getEnv().initClusterEnvironment(1, 1);
+ prepareDataInTree();
+ prepareDataInTable();
}
@AfterClass
public static void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}
- // @Test
+ @Test
public void userDefinedModelManagementTestInTree() throws SQLException,
InterruptedException {
try (Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
- userDefinedModelManagementTest(statement);
+ registerUserDefinedModel(statement);
+ callInferenceTest(
+ statement, new FakeModelInfo("user_chronos", "custom_t5",
"user_defined", "active"));
+ dropUserDefinedModel(statement);
+ errorTest(
+ statement,
+ "create model origin_chronos using uri
\"file:///data/chronos2_origin\"",
+ "1505: 't5' is already used by a Transformers config, pick another
name.");
}
}
- // @Test
+ @Test
public void userDefinedModelManagementTestInTable() throws SQLException,
InterruptedException {
try (Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
- userDefinedModelManagementTest(statement);
+ registerUserDefinedModel(statement);
+ forecastTableFunctionTest(
+ statement, new FakeModelInfo("user_chronos", "custom_t5",
"user_defined", "active"));
+ dropUserDefinedModel(statement);
+ errorTest(
+ statement,
+ "create model origin_chronos using uri
\"file:///data/chronos2_origin\"",
+ "1505: 't5' is already used by a Transformers config, pick another
name.");
}
}
- private void userDefinedModelManagementTest(Statement statement)
+ private void registerUserDefinedModel(Statement statement)
throws SQLException, InterruptedException {
final String alterConfigSQL = "set configuration
\"trusted_uri_pattern\"='.*'";
- final String registerSql = "create model operationTest using uri \"" +
"\"";
- final String showSql = "SHOW MODELS operationTest";
- final String dropSql = "DROP MODEL operationTest";
-
+ final String registerSql = "create model user_chronos using uri
\"file:///data/chronos2\"";
+ final String showSql = "SHOW MODELS user_chronos";
statement.execute(alterConfigSQL);
statement.execute(registerSql);
boolean loading = true;
- int count = 0;
for (int retryCnt = 0; retryCnt < 100; retryCnt++) {
try (ResultSet resultSet = statement.executeQuery(showSql)) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
checkHeader(resultSetMetaData, "ModelId,ModelType,Category,State");
while (resultSet.next()) {
String modelId = resultSet.getString(1);
+ String modelType = resultSet.getString(2);
String category = resultSet.getString(3);
String state = resultSet.getString(4);
- assertEquals("operationTest", modelId);
- assertEquals("USER-DEFINED", category);
- if (state.equals("ACTIVE")) {
+ assertEquals("user_chronos", modelId);
+ assertEquals("user_defined", category);
+ assertEquals("custom_t5", modelType);
Review Comment:
The category assertion at line 118 is executed before the modelType
assertion at line 119, but the result set column order is ModelId(1),
ModelType(2), Category(3), State(4). The assertions should match the logical
order of validation. Consider reordering to: modelId, modelType, category,
state for better code readability and logical flow.
```suggestion
assertEquals("custom_t5", modelType);
assertEquals("user_defined", category);
```
##########
iotdb-core/ainode/iotdb/ainode/core/model/model_storage.py:
##########
@@ -227,17 +227,22 @@ def _process_user_defined_model_directory(self,
model_dir: str, model_id: str):
model_type = config.get("model_type", "")
auto_map = config.get("auto_map", None)
pipeline_cls = config.get("pipeline_cls", "")
-
+ model_info = ModelInfo(
+ model_id=model_id,
+ model_type=model_type,
+ category=ModelCategory.USER_DEFINED,
+ state=ModelStates.ACTIVE,
+ pipeline_cls=pipeline_cls,
+ auto_map=auto_map,
+ transformers_registered=False, # Lazy registration
+ )
+ with self._lock_pool.get_lock(model_id).write_lock():
+ self._models[ModelCategory.USER_DEFINED.value][model_id] =
model_info
+ if self.ensure_transformers_registered(model_id) is None:
+ model_info.state = ModelStates.INACTIVE
+ else:
+ model_info.transformers_registered = True
with self._lock_pool.get_lock(model_id).write_lock():
- model_info = ModelInfo(
- model_id=model_id,
- model_type=model_type,
- category=ModelCategory.USER_DEFINED,
- state=ModelStates.ACTIVE,
- pipeline_cls=pipeline_cls,
- auto_map=auto_map,
- _transformers_registered=False, # Lazy registration
- )
self._models[ModelCategory.USER_DEFINED.value][model_id] =
model_info
Review Comment:
The model_info is being stored in the dictionary twice with write locks, and
the state is being modified outside the lock. This creates a potential race
condition. The model_info should be added to the dictionary only once after all
modifications are complete. Consider restructuring to: (1) create ModelInfo,
(2) call ensure_transformers_registered and update
state/transformers_registered accordingly, (3) acquire lock once and store the
final model_info.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]