CRZbulabula commented on code in PR #16794:
URL: https://github.com/apache/iotdb/pull/16794#discussion_r2563307866


##########
iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py:
##########
@@ -123,66 +116,36 @@ def _step(self):
 
         for requests in grouped_requests:
             batch_inputs = 
self._batcher.batch_request(requests).to(self.device)
-            if self.model_info.model_type == BuiltInModelType.SUNDIAL.value:
-                batch_output = self._model.generate(
-                    batch_inputs,
-                    max_new_tokens=requests[0].max_new_tokens,
-                    num_samples=10,
-                    revin=True,
-                )
-
-                offset = 0
-                for request in requests:
-                    request.output_tensor = 
request.output_tensor.to(self.device)
-                    cur_batch_size = request.batch_size
-                    cur_output = batch_output[offset : offset + cur_batch_size]
-                    offset += cur_batch_size
-                    request.write_step_output(cur_output.mean(dim=1))
-
-                    request.inference_pipeline.post_decode()
-                    if request.is_finished():
-                        request.inference_pipeline.post_inference()
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
-                        )
-                        # ensure the output tensor is on CPU before sending to 
result queue
-                        request.output_tensor = request.output_tensor.cpu()
-                        self._finished_queue.put(request)
-                    else:
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
-                        )
-                        self._waiting_queue.put(request)
-
-            elif self.model_info.model_type == BuiltInModelType.TIMER_XL.value:
-                batch_output = self._model.generate(
-                    batch_inputs,
-                    max_new_tokens=requests[0].max_new_tokens,
-                    revin=True,
-                )
-
-                offset = 0
-                for request in requests:
-                    request.output_tensor = 
request.output_tensor.to(self.device)
-                    cur_batch_size = request.batch_size
-                    cur_output = batch_output[offset : offset + cur_batch_size]
-                    offset += cur_batch_size
-                    request.write_step_output(cur_output)
-
-                    request.inference_pipeline.post_decode()
-                    if request.is_finished():
-                        request.inference_pipeline.post_inference()
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
-                        )
-                        # ensure the output tensor is on CPU before sending to 
result queue
-                        request.output_tensor = request.output_tensor.cpu()
-                        self._finished_queue.put(request)
-                    else:
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
-                        )
-                        self._waiting_queue.put(request)
+            batch_output = self._inference_pipeline.infer(
+                batch_inputs,
+                predict_length=requests[0].max_new_tokens,
+                # num_samples=10,
+                revin=True,
+            )
+            offset = 0
+            for request in requests:
+                request.output_tensor = request.output_tensor.to(self.device)
+                cur_batch_size = request.batch_size
+                cur_output = batch_output[offset : offset + cur_batch_size]
+                offset += cur_batch_size
+                # request.write_step_output(cur_output.mean(dim=1))

Review Comment:
   Remove this.



##########
iotdb-core/ainode/iotdb/ainode/core/constant.py:
##########
@@ -63,8 +62,15 @@
 # AINode folder structure
 AINODE_MODELS_DIR = os.path.join(IOTDB_AINODE_HOME, "data/ainode/models")
 AINODE_BUILTIN_MODELS_DIR = os.path.join(
-    IOTDB_AINODE_HOME, "data/ainode/models/weights"
+    IOTDB_AINODE_HOME, "data/ainode/models/builtin"
 )  # For built-in models, we only need to store their weights and config.
+AINODE_FINETUNE_MODELS_DIR = os.path.join(
+    IOTDB_AINODE_HOME, "data/ainode/models/finetune"
+)
+AINODE_USER_DEFINED_MODELS_DIR = os.path.join(
+    IOTDB_AINODE_HOME, "data/ainode/models/user_defined"
+)
+AINODE_CACHE_DIR = os.path.expanduser("~/.cache/ainode")

Review Comment:
   What is this constant for? Remove it if useless.



##########
iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py:
##########
@@ -123,66 +116,36 @@ def _step(self):
 
         for requests in grouped_requests:
             batch_inputs = 
self._batcher.batch_request(requests).to(self.device)
-            if self.model_info.model_type == BuiltInModelType.SUNDIAL.value:
-                batch_output = self._model.generate(
-                    batch_inputs,
-                    max_new_tokens=requests[0].max_new_tokens,
-                    num_samples=10,
-                    revin=True,
-                )
-
-                offset = 0
-                for request in requests:
-                    request.output_tensor = 
request.output_tensor.to(self.device)
-                    cur_batch_size = request.batch_size
-                    cur_output = batch_output[offset : offset + cur_batch_size]
-                    offset += cur_batch_size
-                    request.write_step_output(cur_output.mean(dim=1))
-
-                    request.inference_pipeline.post_decode()
-                    if request.is_finished():
-                        request.inference_pipeline.post_inference()
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
-                        )
-                        # ensure the output tensor is on CPU before sending to 
result queue
-                        request.output_tensor = request.output_tensor.cpu()
-                        self._finished_queue.put(request)
-                    else:
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
-                        )
-                        self._waiting_queue.put(request)
-
-            elif self.model_info.model_type == BuiltInModelType.TIMER_XL.value:
-                batch_output = self._model.generate(
-                    batch_inputs,
-                    max_new_tokens=requests[0].max_new_tokens,
-                    revin=True,
-                )
-
-                offset = 0
-                for request in requests:
-                    request.output_tensor = 
request.output_tensor.to(self.device)
-                    cur_batch_size = request.batch_size
-                    cur_output = batch_output[offset : offset + cur_batch_size]
-                    offset += cur_batch_size
-                    request.write_step_output(cur_output)
-
-                    request.inference_pipeline.post_decode()
-                    if request.is_finished():
-                        request.inference_pipeline.post_inference()
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
-                        )
-                        # ensure the output tensor is on CPU before sending to 
result queue
-                        request.output_tensor = request.output_tensor.cpu()
-                        self._finished_queue.put(request)
-                    else:
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
-                        )
-                        self._waiting_queue.put(request)
+            batch_output = self._inference_pipeline.infer(
+                batch_inputs,
+                predict_length=requests[0].max_new_tokens,
+                # num_samples=10,
+                revin=True,
+            )

Review Comment:
   In your current implementation, there is no parameters can be delivered into 
inference_pipeline, we should further discuss.



##########
iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py:
##########
@@ -123,66 +116,36 @@ def _step(self):
 
         for requests in grouped_requests:
             batch_inputs = 
self._batcher.batch_request(requests).to(self.device)
-            if self.model_info.model_type == BuiltInModelType.SUNDIAL.value:
-                batch_output = self._model.generate(
-                    batch_inputs,
-                    max_new_tokens=requests[0].max_new_tokens,
-                    num_samples=10,
-                    revin=True,
-                )
-
-                offset = 0
-                for request in requests:
-                    request.output_tensor = 
request.output_tensor.to(self.device)
-                    cur_batch_size = request.batch_size
-                    cur_output = batch_output[offset : offset + cur_batch_size]
-                    offset += cur_batch_size
-                    request.write_step_output(cur_output.mean(dim=1))
-
-                    request.inference_pipeline.post_decode()
-                    if request.is_finished():
-                        request.inference_pipeline.post_inference()
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
-                        )
-                        # ensure the output tensor is on CPU before sending to 
result queue
-                        request.output_tensor = request.output_tensor.cpu()
-                        self._finished_queue.put(request)
-                    else:
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
-                        )
-                        self._waiting_queue.put(request)
-
-            elif self.model_info.model_type == BuiltInModelType.TIMER_XL.value:
-                batch_output = self._model.generate(
-                    batch_inputs,
-                    max_new_tokens=requests[0].max_new_tokens,
-                    revin=True,
-                )
-
-                offset = 0
-                for request in requests:
-                    request.output_tensor = 
request.output_tensor.to(self.device)
-                    cur_batch_size = request.batch_size
-                    cur_output = batch_output[offset : offset + cur_batch_size]
-                    offset += cur_batch_size
-                    request.write_step_output(cur_output)
-
-                    request.inference_pipeline.post_decode()
-                    if request.is_finished():
-                        request.inference_pipeline.post_inference()
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
-                        )
-                        # ensure the output tensor is on CPU before sending to 
result queue
-                        request.output_tensor = request.output_tensor.cpu()
-                        self._finished_queue.put(request)
-                    else:
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
-                        )
-                        self._waiting_queue.put(request)
+            batch_output = self._inference_pipeline.infer(
+                batch_inputs,
+                predict_length=requests[0].max_new_tokens,
+                # num_samples=10,
+                revin=True,
+            )
+            offset = 0
+            for request in requests:
+                request.output_tensor = request.output_tensor.to(self.device)
+                cur_batch_size = request.batch_size
+                cur_output = batch_output[offset : offset + cur_batch_size]
+                offset += cur_batch_size
+                # request.write_step_output(cur_output.mean(dim=1))
+                request.write_step_output(cur_output)
+
+                # self._inference_pipeline.post_decode()
+                if request.is_finished():
+                    # self._inference_pipeline.post_inference()

Review Comment:
   Remove this.



##########
iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py:
##########
@@ -123,66 +116,36 @@ def _step(self):
 
         for requests in grouped_requests:
             batch_inputs = 
self._batcher.batch_request(requests).to(self.device)
-            if self.model_info.model_type == BuiltInModelType.SUNDIAL.value:
-                batch_output = self._model.generate(
-                    batch_inputs,
-                    max_new_tokens=requests[0].max_new_tokens,
-                    num_samples=10,
-                    revin=True,
-                )
-
-                offset = 0
-                for request in requests:
-                    request.output_tensor = 
request.output_tensor.to(self.device)
-                    cur_batch_size = request.batch_size
-                    cur_output = batch_output[offset : offset + cur_batch_size]
-                    offset += cur_batch_size
-                    request.write_step_output(cur_output.mean(dim=1))
-
-                    request.inference_pipeline.post_decode()
-                    if request.is_finished():
-                        request.inference_pipeline.post_inference()
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
-                        )
-                        # ensure the output tensor is on CPU before sending to 
result queue
-                        request.output_tensor = request.output_tensor.cpu()
-                        self._finished_queue.put(request)
-                    else:
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
-                        )
-                        self._waiting_queue.put(request)
-
-            elif self.model_info.model_type == BuiltInModelType.TIMER_XL.value:
-                batch_output = self._model.generate(
-                    batch_inputs,
-                    max_new_tokens=requests[0].max_new_tokens,
-                    revin=True,
-                )
-
-                offset = 0
-                for request in requests:
-                    request.output_tensor = 
request.output_tensor.to(self.device)
-                    cur_batch_size = request.batch_size
-                    cur_output = batch_output[offset : offset + cur_batch_size]
-                    offset += cur_batch_size
-                    request.write_step_output(cur_output)
-
-                    request.inference_pipeline.post_decode()
-                    if request.is_finished():
-                        request.inference_pipeline.post_inference()
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
-                        )
-                        # ensure the output tensor is on CPU before sending to 
result queue
-                        request.output_tensor = request.output_tensor.cpu()
-                        self._finished_queue.put(request)
-                    else:
-                        self._logger.debug(
-                            
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
-                        )
-                        self._waiting_queue.put(request)
+            batch_output = self._inference_pipeline.infer(
+                batch_inputs,
+                predict_length=requests[0].max_new_tokens,
+                # num_samples=10,
+                revin=True,
+            )
+            offset = 0
+            for request in requests:
+                request.output_tensor = request.output_tensor.to(self.device)
+                cur_batch_size = request.batch_size
+                cur_output = batch_output[offset : offset + cur_batch_size]
+                offset += cur_batch_size
+                # request.write_step_output(cur_output.mean(dim=1))
+                request.write_step_output(cur_output)
+
+                # self._inference_pipeline.post_decode()

Review Comment:
   Remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to