CRZbulabula commented on code in PR #16079:
URL: https://github.com/apache/iotdb/pull/16079#discussion_r2249501745


##########
iotdb-core/ainode/ainode/core/manager/inference_manager.py:
##########
@@ -135,49 +141,167 @@ def infer(self, full_data, window_interval=None, 
window_step=None, **_):
 
 
 class InferenceManager:
-    ACCELERATE_MODEL_ID = "sundial"
-    # DEFAULT_DEVICE = "cpu"
-    DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
-    DEFAULT_POOL_SIZE = (
-        0  # TODO: Remove these parameter by sampling model inference 
consumption
-    )
+    ACCELERATE_MODEL_ID = ["sundial", "timer_xl"]
+    DEFAULT_DEVICE = "cpu"
+    # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
     WAITING_INTERVAL_IN_MS = (
         
AINodeDescriptor().get_config().get_ain_inference_batch_interval_in_ms()
     )  # How often to check for requests in the result queue
+    MODEL_MEM_USAGE_MAP = (
+        AINodeDescriptor().get_config().get_ain_inference_model_mem_usage_map()
+    )  # the memory usage of each model in bytes
+    INFERENCE_MEMORY_USAGE_RATIO = (
+        AINodeDescriptor().get_config().get_ain_inference_memory_usage_ratio()
+    )  # the device space allocated for inference
+    INFERENCE_WEIGHT_OVERHEAD_RATIO = (
+        
AINodeDescriptor().get_config().get_ain_inference_weight_overhead_ratio()
+    )  # the weight overhead ratio for inference, used to estimate the pool 
size
 
     def __init__(self):
         self._model_manager = ModelManager()
         self._result_queue = mp.Queue()
         self._result_wrapper_map = {}
         self._result_wrapper_lock = threading.RLock()
+        self._pool_init_lock = (
+            threading.Lock()
+        )  # used for lazy initialization of inference request pools
         # structure: {model_id: [(InferenceRequestPool, request_queue), ...]}
         self._request_pool_map: Dict[str, List[(InferenceRequestPool, 
mp.Queue)]] = {}
         self._stop_event = mp.Event()
-        self._init_inference_request_pool()
         self._result_handler_thread = threading.Thread(
             target=self._handle_results, daemon=True
         )
         self._result_handler_thread.start()
+        self._model_mem_usage_map: Dict[str, int] = (
+            {}
+        )  # store model memory usage for each model
+        # self._preload_model_benchmarks()
+
+    def _preload_model_benchmarks(self):
+        if "cuda" in str(self.DEFAULT_DEVICE):
+            model_id = self.ACCELERATE_MODEL_ID
+            mem_usage = self._measure_model_memory(model_id)
+            self._model_mem_usage_map[model_id] = mem_usage
+            logger.info(
+                f"[Inference] Preloaded benchmark for {model_id}, 
mem_usage={mem_usage/1024**2:.2f} MB"
+            )
+        else:
+            logger.warning(
+                f"[Inference] Skipped preloading benchmarks for 
{self.DEFAULT_DEVICE}, only supports CUDA currently"
+            )
+
+    def _measure_model_memory(self, model_id: str) -> int:
+        # TODO: support CPU in the future
+        # TODO: we can estimate the memory usage by running a dummy inference
+        device = self.DEFAULT_DEVICE
+        torch.cuda.empty_cache()
+        torch.cuda.synchronize(device)
+        start = torch.cuda.memory_reserved(device)
+
+        model = ModelManager().load_model(model_id, {}).to(device)
+        torch.cuda.synchronize(device)
+        end = torch.cuda.memory_reserved(device)
+        usage = end - start
+
+        # delete model to free memory
+        del model
+        torch.cuda.empty_cache()
+        gc.collect()
+
+        # add inference factor and cuda context overhead
+        inference_factor = 1.2
+        overhead = 500 * 1024 * 1024
+        final = int(max(usage, 1) * inference_factor + overhead)
+        return final
+
+    def _evaluate_system_resources(self):
+        if torch.cuda.is_available():
+            free_mem, total_mem = torch.cuda.mem_get_info()
+            logger.info(
+                f"[Inference][Device-{self.DEFAULT_DEVICE}] CUDA device 
memory: free={free_mem/1024**2:.2f} MB, total={total_mem/1024**2:.2f} MB"
+            )
+            return {"device": "cuda", "free_mem": free_mem, "total_mem": 
total_mem}
+        else:
+            # TODO: test cpu in the future
+            free_mem = psutil.virtual_memory().available
+            total_mem = psutil.virtual_memory().total
+            logger.info(
+                f"[Inference][Device-{self.DEFAULT_DEVICE}] CPU memory: 
free={free_mem/1024**2:.2f} MB, total={total_mem/1024**2:.2f} MB"
+            )
+            return {"device": "cpu", "free_mem": free_mem, "total_mem": 
total_mem}
+
+    def _estimate_pool_size(self, model_id: str) -> int:
+        if model_id not in self.MODEL_MEM_USAGE_MAP:
+            logger.error(
+                f"[Inference][Device-{self.DEFAULT_DEVICE}] Model {model_id} 
not supported now"
+            )
+            return 0
 
-    def _init_inference_request_pool(self):
-        """
-        Initialize the inference request pool for each model.
-        TODO: This is a temporary solution, we need a automatic algorithm to 
adjust the pool size for different models
-        """
-        self._request_pool_map[self.ACCELERATE_MODEL_ID] = []
-        for idx in range(self.DEFAULT_POOL_SIZE):
-            sundial_config = SundialConfig()
-            request_queue = mp.Queue()
-            request_pool = InferenceRequestPool(
-                pool_id=idx,
-                model_id=self.ACCELERATE_MODEL_ID,
-                config=sundial_config,
-                request_queue=request_queue,
+        system_res = self._evaluate_system_resources()
+        free_mem = system_res["free_mem"]
+
+        mem_usage = (
+            self.MODEL_MEM_USAGE_MAP[model_id] * 
self.INFERENCE_WEIGHT_OVERHEAD_RATIO
+        )
+        size = int((free_mem * self.INFERENCE_MEMORY_USAGE_RATIO) // mem_usage)
+        if size <= 0:
+            logger.error(
+                f"[Inference][Device-{self.DEFAULT_DEVICE}] Not enough memory 
to run model {model_id}. free={free_mem/1024**2:.2f} MB, 
need>={mem_usage/1024**2:.2f} MB"
+            )
+            return 0
+
+        logger.info(
+            f"[Inference][Device-{self.DEFAULT_DEVICE}] "
+            f"model={model_id}, mem_usage={mem_usage/1024**2:.2f} MB, "
+            f"pool_num={size}"
+        )
+        return size
+
+    def _first_pool_init(self, model_id: str):
+        if model_id == "sundial":
+            config = SundialConfig()
+        elif model_id == "timer_xl":
+            config = TimerConfig()
+        first_queue = mp.Queue()
+        ready_event = mp.Event()
+        first_pool = InferenceRequestPool(
+            pool_id=0,
+            model_id=model_id,
+            config=config,
+            request_queue=first_queue,
+            result_queue=self._result_queue,
+            ready_event=ready_event,
+        )
+        first_pool.start()
+        if not ready_event.wait(timeout=30):
+            logger.error(
+                f"[Inference][Device-{self.DEFAULT_DEVICE}][Pool-0] First pool 
failed to be ready in time"
+            )
+        else:
+            self._request_pool_map[model_id] = [(first_pool, first_queue)]
+            logger.info(
+                f"[Inference][Device-{self.DEFAULT_DEVICE}][Pool-0] 
Initialized inference request pool for model {model_id}"
+            )
+
+    def _expand_pools(self, model_id, start_idx, count):
+        for idx in range(count):
+            queue = mp.Queue()
+            if model_id == "sundial":
+                config = SundialConfig()
+            elif model_id == "timer_xl":
+                config = TimerConfig()
+            pool = InferenceRequestPool(
+                pool_id=start_idx + idx,
+                model_id=model_id,
+                config=config,
+                request_queue=queue,
                 result_queue=self._result_queue,
+                ready_event=mp.Event(),
             )
-            request_pool.start()
-            self._request_pool_map[self.ACCELERATE_MODEL_ID].append(
-                (request_pool, request_queue)
+            pool.start()
+            self._request_pool_map[model_id].append((pool, queue))

Review Comment:
   The `ready_event` judgement should also be set here. Otherwise, the 
inference request might be routed to a unready pool



##########
iotdb-core/ainode/ainode/core/constant.py:
##########
@@ -51,6 +51,12 @@
 # AINode inference configuration
 AINODE_INFERENCE_BATCH_INTERVAL_IN_MS = 15
 AINODE_INFERENCE_MAX_PREDICT_LENGTH = 2880
+AINODE_INFERENCE_MODEL_MEM_USAGE_MAP = {
+    "sundial": 1036 * 1024**2,

Review Comment:
   Retrieve the `model_info.py`, there is some enum can be used for the key of 
this map



##########
iotdb-core/ainode/ainode/core/constant.py:
##########
@@ -51,6 +51,12 @@
 # AINode inference configuration
 AINODE_INFERENCE_BATCH_INTERVAL_IN_MS = 15
 AINODE_INFERENCE_MAX_PREDICT_LENGTH = 2880
+AINODE_INFERENCE_MODEL_MEM_USAGE_MAP = {
+    "sundial": 1036 * 1024**2,
+    "timer_xl": 856 * 1024**2,
+}
+AINODE_INFERENCE_MEMORY_USAGE_RATIO = 0.4
+AINODE_INFERENCE_WEIGHT_OVERHEAD_RATIO = 1.2

Review Comment:
   Rename this parameter, and write some annotation for it



##########
iotdb-core/ainode/ainode/core/constant.py:
##########
@@ -51,6 +51,12 @@
 # AINode inference configuration
 AINODE_INFERENCE_BATCH_INTERVAL_IN_MS = 15
 AINODE_INFERENCE_MAX_PREDICT_LENGTH = 2880
+AINODE_INFERENCE_MODEL_MEM_USAGE_MAP = {
+    "sundial": 1036 * 1024**2,
+    "timer_xl": 856 * 1024**2,
+}

Review Comment:
   Append the unit for each model



##########
iotdb-core/ainode/ainode/core/manager/inference_manager.py:
##########
@@ -135,49 +141,167 @@ def infer(self, full_data, window_interval=None, 
window_step=None, **_):
 
 
 class InferenceManager:
-    ACCELERATE_MODEL_ID = "sundial"
-    # DEFAULT_DEVICE = "cpu"
-    DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
-    DEFAULT_POOL_SIZE = (
-        0  # TODO: Remove these parameter by sampling model inference 
consumption
-    )
+    ACCELERATE_MODEL_ID = ["sundial", "timer_xl"]
+    DEFAULT_DEVICE = "cpu"
+    # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
     WAITING_INTERVAL_IN_MS = (
         
AINodeDescriptor().get_config().get_ain_inference_batch_interval_in_ms()
     )  # How often to check for requests in the result queue
+    MODEL_MEM_USAGE_MAP = (
+        AINodeDescriptor().get_config().get_ain_inference_model_mem_usage_map()
+    )  # the memory usage of each model in bytes
+    INFERENCE_MEMORY_USAGE_RATIO = (
+        AINodeDescriptor().get_config().get_ain_inference_memory_usage_ratio()
+    )  # the device space allocated for inference
+    INFERENCE_WEIGHT_OVERHEAD_RATIO = (
+        
AINodeDescriptor().get_config().get_ain_inference_weight_overhead_ratio()
+    )  # the weight overhead ratio for inference, used to estimate the pool 
size
 
     def __init__(self):
         self._model_manager = ModelManager()
         self._result_queue = mp.Queue()
         self._result_wrapper_map = {}
         self._result_wrapper_lock = threading.RLock()
+        self._pool_init_lock = (
+            threading.Lock()
+        )  # used for lazy initialization of inference request pools
         # structure: {model_id: [(InferenceRequestPool, request_queue), ...]}
         self._request_pool_map: Dict[str, List[(InferenceRequestPool, 
mp.Queue)]] = {}
         self._stop_event = mp.Event()
-        self._init_inference_request_pool()
         self._result_handler_thread = threading.Thread(
             target=self._handle_results, daemon=True
         )
         self._result_handler_thread.start()
+        self._model_mem_usage_map: Dict[str, int] = (
+            {}
+        )  # store model memory usage for each model
+        # self._preload_model_benchmarks()
+
+    def _preload_model_benchmarks(self):
+        if "cuda" in str(self.DEFAULT_DEVICE):
+            model_id = self.ACCELERATE_MODEL_ID
+            mem_usage = self._measure_model_memory(model_id)
+            self._model_mem_usage_map[model_id] = mem_usage
+            logger.info(
+                f"[Inference] Preloaded benchmark for {model_id}, 
mem_usage={mem_usage/1024**2:.2f} MB"
+            )
+        else:
+            logger.warning(
+                f"[Inference] Skipped preloading benchmarks for 
{self.DEFAULT_DEVICE}, only supports CUDA currently"
+            )
+
+    def _measure_model_memory(self, model_id: str) -> int:

Review Comment:
   Consider create a new util file for extracting these `measure, estimate, 
evaluate` methods, to avoid the `inference_manager.py` being too complex to 
understand



##########
iotdb-core/ainode/ainode/core/config.py:
##########
@@ -294,6 +329,21 @@ def _load_config_from_file(self) -> None:
                     int(file_configs["ain_inference_batch_interval_in_ms"])
                 )
 
+            if "ain_inference_model_mem_usage_map" in config_keys:
+                self._config.set_ain_inference_model_mem_usage_map(
+                    eval(file_configs["ain_inference_model_mem_usage_map"])
+                )
+
+            if "ain_inference_memory_usage_ratio" in config_keys:
+                self._config.set_ain_inference_memory_usage_ratio(
+                    float(file_configs["ain_inference_memory_usage_ratio"])
+                )
+
+            if "ain_inference_weight_overhead_ratio" in config_keys:
+                self._config.set_ain_inference_weight_overhead_ratio(
+                    float(file_configs["ain_inference_weight_overhead_ratio"])
+                )

Review Comment:
   Update `iotdb-ainode.properties` as well



##########
iotdb-core/ainode/ainode/core/manager/inference_manager.py:
##########
@@ -229,18 +353,43 @@ def _run(
                     predict_length,
                 )
 
-            if model_id == self.ACCELERATE_MODEL_ID and self.DEFAULT_POOL_SIZE 
> 0:
-                # TODO: Logic in this branch shall handle all LTSM inferences
+            if model_id in self.ACCELERATE_MODEL_ID and "cuda" in str(
+                self.DEFAULT_DEVICE
+            ):
+                # lazy initialization for first request
+                if model_id not in self._request_pool_map:
+                    with self._pool_init_lock:

Review Comment:
   Currently, this lock seems useless. A useful lock can be set to protect the 
route and create behaviour for a specific type of request pool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to