Copilot commented on code in PR #17299:
URL: https://github.com/apache/iotdb/pull/17299#discussion_r2943975289
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -196,60 +200,92 @@ def _worker_loop(self):
finally:
self._task_queue.task_done()
- def _load_model_task(self, model_id: str, device_id_list:
list[torch.device]):
- def _load_model_on_device_task(device_id: torch.device):
- if not self.has_request_pools(model_id, device_id):
- actions = self._pool_scheduler.schedule_load_model_to_device(
- self._model_manager.get_model_info(model_id), device_id
- )
- for action in actions:
- if action.action == ScaleActionType.SCALE_UP:
- self._expand_pools_on_device(
- action.model_id, device_id, action.amount
- )
- elif action.action == ScaleActionType.SCALE_DOWN:
- self._shrink_pools_on_device(
- action.model_id, device_id, action.amount
- )
+ def _load_one_model_task(self, model_id: str, device_id_list:
list[torch.device]):
+ def _load_one_model_on_device_task(device_id: torch.device):
+ if not self.has_pool_on_device(device_id):
+ self._expand_pools_on_device(model_id, device_id, 1)
else:
logger.info(
- f"[Inference][{device_id}] Model {model_id} is already
installed."
+ f"[Inference][{device_id}] There are already pools on this
device."
)
load_model_futures = self._executor.submit_batch(
- device_id_list, _load_model_on_device_task
+ device_id_list, _load_one_model_on_device_task
)
concurrent.futures.wait(
load_model_futures, return_when=concurrent.futures.ALL_COMPLETED
)
- def _unload_model_task(self, model_id: str, device_id_list:
list[torch.device]):
- def _unload_model_on_device_task(device_id: torch.device):
+ def _unload_one_model_task(self, model_id: str, device_id_list:
list[torch.device]):
+ def _unload_one_model_on_device_task(device_id: torch.device):
if self.has_request_pools(model_id, device_id):
- actions =
self._pool_scheduler.schedule_unload_model_from_device(
- self._model_manager.get_model_info(model_id), device_id
- )
- for action in actions:
- if action.action == ScaleActionType.SCALE_DOWN:
- self._shrink_pools_on_device(
- action.model_id, device_id, action.amount
- )
- elif action.action == ScaleActionType.SCALE_UP:
- self._expand_pools_on_device(
- action.model_id, device_id, action.amount
- )
+ self._shrink_pools_on_device(model_id, device_id, 1)
else:
logger.info(
f"[Inference][{device_id}] Model {model_id} is not
installed."
)
unload_model_futures = self._executor.submit_batch(
- device_id_list, _unload_model_on_device_task
+ device_id_list, _unload_one_model_on_device_task
)
concurrent.futures.wait(
unload_model_futures, return_when=concurrent.futures.ALL_COMPLETED
)
+ # def _load_model_task(self, model_id: str, device_id_list:
list[torch.device]):
+ # def _load_model_on_device_task(device_id: torch.device):
+ # if not self.has_request_pools(model_id, device_id):
+ # actions = self._pool_scheduler.schedule_load_model_to_device(
+ # self._model_manager.get_model_info(model_id), device_id
+ # )
+ # for action in actions:
+ # if action.action == ScaleActionType.SCALE_UP:
+ # self._expand_pools_on_device(
+ # action.model_id, device_id, action.amount
+ # )
+ # elif action.action == ScaleActionType.SCALE_DOWN:
+ # self._shrink_pools_on_device(
+ # action.model_id, device_id, action.amount
+ # )
+ # else:
+ # logger.info(
+ # f"[Inference][{device_id}] Model {model_id} is already
installed."
+ # )
+ #
+ # load_model_futures = self._executor.submit_batch(
+ # device_id_list, _load_model_on_device_task
Review Comment:
The old implementations of `_load_model_task` / `_unload_model_task` are
left in the file as a large commented-out block. This makes the module harder
to read and maintain; git history already preserves prior implementations.
Please remove the commented-out code (or move it behind a real feature
flag/config if it still needs to be selectable at runtime).
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -196,60 +200,92 @@ def _worker_loop(self):
finally:
self._task_queue.task_done()
- def _load_model_task(self, model_id: str, device_id_list:
list[torch.device]):
- def _load_model_on_device_task(device_id: torch.device):
- if not self.has_request_pools(model_id, device_id):
- actions = self._pool_scheduler.schedule_load_model_to_device(
- self._model_manager.get_model_info(model_id), device_id
- )
- for action in actions:
- if action.action == ScaleActionType.SCALE_UP:
- self._expand_pools_on_device(
- action.model_id, device_id, action.amount
- )
- elif action.action == ScaleActionType.SCALE_DOWN:
- self._shrink_pools_on_device(
- action.model_id, device_id, action.amount
- )
+ def _load_one_model_task(self, model_id: str, device_id_list:
list[torch.device]):
+ def _load_one_model_on_device_task(device_id: torch.device):
+ if not self.has_pool_on_device(device_id):
+ self._expand_pools_on_device(model_id, device_id, 1)
else:
logger.info(
- f"[Inference][{device_id}] Model {model_id} is already
installed."
+ f"[Inference][{device_id}] There are already pools on this
device."
)
Review Comment:
`_load_one_model_task` prevents loading a model onto a device if *any* pool
already exists on that device (`has_pool_on_device`). This means a `LOAD MODEL
<new>` request can become a silent no-op when a different model is already
loaded on the target device(s), while upper layers (e.g., InferenceManager)
still treat the submission as successful. If the intent is to allow multiple
models per device, this should check `has_request_pools(model_id, device_id)`
instead. If the intent is to enforce single-model-per-device, this path should
surface a failure (or explicitly unload/replace the existing model) rather than
only logging.
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -462,6 +498,12 @@ def has_running_pools(self, model_id: str) -> bool:
return True
return False
+ def has_pool_on_device(self, device_id: torch.device) -> bool:
+ """
+ Check if there are pools on the given device_id.
+ """
+ return any(device_id in pools for pools in
self._request_pool_map.values())
Review Comment:
`has_pool_on_device` returns true if *any* model has a pool on the device,
but the method name reads like it might be scoped to a specific model or a
single pool. Consider renaming to something like `has_any_pool_on_device` /
`has_any_pools_on_device` (and/or clarifying in the docstring) so callers don't
misinterpret the semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]