CRZbulabula commented on code in PR #16005:
URL: https://github.com/apache/iotdb/pull/16005#discussion_r2224182400


##########
iotdb-core/ainode/ainode/core/inference/scheduler/basic_scheduler.py:
##########
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import torch
+
+from ainode.core.inference.inference_request import InferenceRequest
+from ainode.core.inference.scheduler.abstract_scheduler import 
AbstractScheduler
+from ainode.core.log import Logger
+
+logger = Logger()
+
+
+class BasicScheduler(AbstractScheduler):
+
+    def __init__(
+        self,
+        waiting_queue,
+        running_queue,
+        finished_queue,
+        max_memory_bytes=1 << 30,
+        max_activate_size=10,
+        max_step_size=10,
+    ):
+        super().__init__(waiting_queue, running_queue, finished_queue)
+        self.max_memory_bytes = max_memory_bytes
+        self.max_activate_size = max_activate_size
+        self.max_step_size = max_step_size
+
+    def memory_is_available(self):
+        used = torch.cuda.memory_allocated()  # memory allocated to tensors

Review Comment:
   Which of the follows is the return value of `torch.cuda.memory_allocated()`?
   1. The allocated memory of this process
   2. The allocated memory of the device where this process resides
   



##########
iotdb-core/ainode/ainode/core/inference/inference_request_pool.py:
##########
@@ -65,65 +67,88 @@ def __init__(
         self._waiting_queue = request_queue  # Requests that are waiting to be 
processed
         self._running_queue = mp.Queue()  # Requests that are currently being 
processed
         self._finished_queue = result_queue  # Requests that are finished
+        self._scheduler = BasicScheduler(
+            self._waiting_queue, self._running_queue, self._finished_queue
+        )
         self._stop_event = mp.Event()
 
         # Fix inference seed
         random.seed(self.FIX_SEED)
         torch.manual_seed(self.FIX_SEED)
         np.random.seed(self.FIX_SEED)
 
-    def memory_is_available(self, request):
-        # need test with several rounds of dummy data
-        pass
+    def _warm_up_and_estimate_memory(self):
+        # TODO: Test per token memory usage
+        torch.cuda.empty_cache()
+        gc.collect()
+        dummy_input = torch.zeros(
+            (1, self.config.input_token_len), dtype=torch.float32
+        ).to(self.device)
+
+        # force cuda synchronization to avoid any asynchronous memory 
allocation issues
+        torch.cuda.reset_peak_memory_stats(self.device)
+        torch.cuda.synchronize(self.device)
+        memory_before_warmup = torch.cuda.memory_allocated(self.device)
+        logger.info(
+            f"Before warm-up, peak memory usage: {memory_before_warmup:.2f} 
bytes"
+        )
 
-    def _activate_requests(self):
-        if self._waiting_queue.empty():
-            return
-        request: InferenceRequest = self._waiting_queue.get()
-        # TODO: Check memory size before activating requests
-        request.inputs = 
request.inference_pipeline.preprocess_inputs(request.inputs)
-        request.mark_running()
-        logger.debug(
-            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is activated with inputs shape {request.inputs.shape}"
+        # warm-up
+        with torch.no_grad():
+            self.model.generate(dummy_input, max_new_tokens=1)
+        torch.cuda.synchronize(self.device)
+        peak_memory_1_token = torch.cuda.max_memory_allocated(self.device)
+        logger.info(
+            f"Baseline memory usage for 1 token: {peak_memory_1_token:.2f} 
bytes"
+        )
+        logger.info(
+            f"Differentiation : {peak_memory_1_token-memory_before_warmup:.2f} 
bytes"
         )
-        self._running_queue.put(request)
+
+    def _activate_requests(self):
+        requests = self._scheduler.schedule_activate()
+        for request in requests:
+            request.inputs = request.inference_pipeline.preprocess_inputs(
+                request.inputs
+            )
+            logger.debug(
+                
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is activated with inputs shape {request.inputs.shape}"
+            )
 
     def _requests_activate_loop(self):
         while not self._stop_event.is_set():
             time.sleep(self.WAITING_INTERVAL_IN_MS / 1000)
             self._activate_requests()
 
     def _step(self):
-        if self._running_queue.empty():
-            return
+        requests = self._scheduler.schedule_step()
         # TODO: We need a batcher to accelerate the concurrent inference
-        # TODO: Check memory size before executing requests
-        request: InferenceRequest = self._running_queue.get()
-        inputs = request.inputs.to(self.device)
-        output = self.model.generate(
-            inputs,
-            max_new_tokens=request.max_new_tokens,
-            num_samples=10,
-            revin=True,
-        )
-        request.output_tensor = request.output_tensor.to(
-            self.device
-        )  # Ensure output tensor is on the same device
-        request.write_step_output(output[0].mean(dim=0))
-        request.inference_pipeline.post_decode()
-        if request.is_finished():
-            request.inference_pipeline.post_inference()
-            logger.debug(
-                
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
+        for request in requests:
+            inputs = request.inputs.to(self.device)

Review Comment:
   Be cautious to this line, seems it is redundant when the `step` is executed 
for multiple times.



##########
iotdb-core/ainode/ainode/core/inference/scheduler/abstract_scheduler.py:
##########
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from abc import ABC, abstractmethod
+
+
+class AbstractScheduler(ABC):
+    """
+    This is the abstract scheduler to schedule inference.
+    """
+
+    def __init__(self, waiting_queue, running_queue, finished_queue):
+        self.waiting_queue = waiting_queue
+        self.running_queue = running_queue
+        self.finished_queue = finished_queue

Review Comment:
   Append more annotations for this abstract class, including:
   1. What is the function of `scheduler`
   2. The usage of these three queues
   3. The behavior of following interfaces, args... returns....
   U can refer to `AbstractInferencePipeline`



##########
iotdb-core/ainode/ainode/core/inference/scheduler/basic_scheduler.py:
##########
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import torch
+
+from ainode.core.inference.inference_request import InferenceRequest
+from ainode.core.inference.scheduler.abstract_scheduler import 
AbstractScheduler
+from ainode.core.log import Logger
+
+logger = Logger()
+
+
+class BasicScheduler(AbstractScheduler):
+
+    def __init__(
+        self,
+        waiting_queue,
+        running_queue,
+        finished_queue,
+        max_memory_bytes=1 << 30,
+        max_activate_size=10,
+        max_step_size=10,
+    ):
+        super().__init__(waiting_queue, running_queue, finished_queue)
+        self.max_memory_bytes = max_memory_bytes
+        self.max_activate_size = max_activate_size
+        self.max_step_size = max_step_size
+
+    def memory_is_available(self):
+        used = torch.cuda.memory_allocated()  # memory allocated to tensors
+        reserved = (
+            torch.cuda.memory_reserved()
+        )  # memory reserved by the caching allocator
+        # logger.debug(f"Memory used: {used} bytes, Max memory: 
{self.max_memory_bytes} bytes")

Review Comment:
   U can uncomment this log since it is already in `debug` level. BTW, it's 
better to follow the log format in `InferenceRequestPool`, which is better for 
debugging a complex system.



##########
iotdb-core/ainode/ainode/core/inference/inference_request_pool.py:
##########
@@ -65,65 +67,88 @@ def __init__(
         self._waiting_queue = request_queue  # Requests that are waiting to be 
processed
         self._running_queue = mp.Queue()  # Requests that are currently being 
processed
         self._finished_queue = result_queue  # Requests that are finished
+        self._scheduler = BasicScheduler(
+            self._waiting_queue, self._running_queue, self._finished_queue
+        )
         self._stop_event = mp.Event()
 
         # Fix inference seed
         random.seed(self.FIX_SEED)
         torch.manual_seed(self.FIX_SEED)
         np.random.seed(self.FIX_SEED)
 
-    def memory_is_available(self, request):
-        # need test with several rounds of dummy data
-        pass
+    def _warm_up_and_estimate_memory(self):
+        # TODO: Test per token memory usage
+        torch.cuda.empty_cache()
+        gc.collect()
+        dummy_input = torch.zeros(
+            (1, self.config.input_token_len), dtype=torch.float32
+        ).to(self.device)
+
+        # force cuda synchronization to avoid any asynchronous memory 
allocation issues
+        torch.cuda.reset_peak_memory_stats(self.device)
+        torch.cuda.synchronize(self.device)
+        memory_before_warmup = torch.cuda.memory_allocated(self.device)
+        logger.info(
+            f"Before warm-up, peak memory usage: {memory_before_warmup:.2f} 
bytes"
+        )
 
-    def _activate_requests(self):
-        if self._waiting_queue.empty():
-            return
-        request: InferenceRequest = self._waiting_queue.get()
-        # TODO: Check memory size before activating requests
-        request.inputs = 
request.inference_pipeline.preprocess_inputs(request.inputs)
-        request.mark_running()
-        logger.debug(
-            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is activated with inputs shape {request.inputs.shape}"
+        # warm-up
+        with torch.no_grad():
+            self.model.generate(dummy_input, max_new_tokens=1)
+        torch.cuda.synchronize(self.device)
+        peak_memory_1_token = torch.cuda.max_memory_allocated(self.device)
+        logger.info(
+            f"Baseline memory usage for 1 token: {peak_memory_1_token:.2f} 
bytes"
+        )
+        logger.info(
+            f"Differentiation : {peak_memory_1_token-memory_before_warmup:.2f} 
bytes"
         )
-        self._running_queue.put(request)
+
+    def _activate_requests(self):
+        requests = self._scheduler.schedule_activate()
+        for request in requests:
+            request.inputs = request.inference_pipeline.preprocess_inputs(
+                request.inputs
+            )

Review Comment:
   U should deliver the scheduled requests to running_queue here rather than 
within the scheduler. Otherwise, some requests would be `step` without 
preprocess their inputs.



##########
iotdb-core/ainode/ainode/core/inference/inference_request_pool.py:
##########
@@ -150,3 +177,10 @@ def run(self):
 
     def stop(self):
         self._stop_event.set()
+        logger.info(f"[Pool-{self.pool_id}] Stopping and releasing GPU 
memory.")

Review Comment:
   Add `Device` tag for this log, the same as below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to