Copilot commented on code in PR #16978:
URL: https://github.com/apache/iotdb/pull/16978#discussion_r2667156792


##########
iotdb-core/ainode/iotdb/ainode/core/model/timer_xl/pipeline_timer.py:
##########
@@ -20,37 +20,83 @@
 
 from iotdb.ainode.core.exception import InferenceModelInternalException
 from iotdb.ainode.core.inference.pipeline.basic_pipeline import 
ForecastPipeline
+from iotdb.ainode.core.log import Logger
+from iotdb.ainode.core.model.model_info import ModelInfo
+
+logger = Logger()
 
 
 class TimerPipeline(ForecastPipeline):
-    def __init__(self, model_info, **model_kwargs):
+    def __init__(self, model_info: ModelInfo, **model_kwargs):
         super().__init__(model_info, model_kwargs=model_kwargs)
 
-    def preprocess(self, inputs):
+    def preprocess(self, inputs, **infer_kwargs) -> torch.Tensor:
         """
-        The inputs shape should be 3D, but Timer-XL only supports 2D tensor: 
[batch_size, sequence_length],
-        we need to squeeze the target_count dimension.
+        Preprocess the input data by converting it to a 2D tensor (Timer-XL 
only supports 2D inputs).
+
+        Parameters:
+            inputs (list): A list of dictionaries containing input data,
+                           where each dictionary should include a "targets" 
key.
+            **infer_kwargs: Additional keyword arguments passed to the method.
+
+        Returns:
+            torch.Tensor: A 2D tensor of shape [batch_size, input_length] 
after squeezing
+                          the target_count dimension.
+
+        Raises:
+            InferenceModelInternalException: If the model receives more than 
one target variable
+                                             (i.e., when inputs.shape[1] != 1).
         """
-        inputs = super().preprocess(inputs)
+        model_id = self.model_info.model_id
+        inputs = super().preprocess(inputs, **infer_kwargs)
+        # Here, we assume element in list has same history_length,
+        # otherwise, the model cannot proceed
+        if inputs[0].get("past_covariates", None) or inputs[0].get(
+            "future_covariates", None

Review Comment:
   The condition checks if past_covariates or future_covariates exist but uses 
`or` logic. This means if only `past_covariates` is provided (which should be 
supported), the warning will still be logged. Consider checking each covariate 
type separately if they have different support levels, or clarify if both are 
truly unsupported.



##########
iotdb-core/ainode/iotdb/ainode/core/model/chronos2/pipeline_chronos2.py:
##########
@@ -387,5 +449,28 @@ def _predict_step(
 
         return prediction
 
-    def postprocess(self, output: torch.Tensor):
-        return output[0].mean(dim=1, keepdim=True)
+    def postprocess(
+        self, outputs: list[torch.Tensor], **infer_kwargs
+    ) -> list[torch.Tensor]:
+        """
+        Postprocesses the model's forecast outputs by selecting the 0.5 
quantile or averaging over quantiles.
+
+        Args:
+            outputs (list[torch.Tensor]): List of forecast outputs, where each 
output is a 3D-tensor with shape [target_count, quantile_count, output_length].
+
+        Returns:
+            list[torch.Tensor]: Processed list of forecast outputs, each is a 
2D-tensor with shape [target_count, output_length].
+        """
+        outputs_list = []
+        for output in outputs:
+            # Check if 0.5 quantile is available
+            if 0.5 in self.quantiles:
+                idx = self.quantiles.index(0.5)
+                # Get the 0.5 quantile value
+                outputs_list.append(output[:, idx, :])
+            else:
+                # If 0.5 quantile is not provided,
+                # get the mean of all quantiles
+                outputs_list.append(output.mean(dim=1))
+        super().postprocess(outputs_list, **infer_kwargs)
+        return outputs_list

Review Comment:
   The method calls super().postprocess() but doesn't use its return value. The 
result is stored in outputs_list which is then returned directly. Either remove 
the super() call if it's not needed, or use its return value instead of 
returning outputs_list.
   ```suggestion
           processed_outputs = super().postprocess(outputs_list, **infer_kwargs)
           return processed_outputs if processed_outputs is not None else 
outputs_list
   ```



##########
iotdb-core/ainode/iotdb/ainode/core/model/sundial/pipeline_sundial.py:
##########
@@ -20,42 +20,92 @@
 
 from iotdb.ainode.core.exception import InferenceModelInternalException
 from iotdb.ainode.core.inference.pipeline.basic_pipeline import 
ForecastPipeline
+from iotdb.ainode.core.log import Logger
+from iotdb.ainode.core.model.model_info import ModelInfo
+
+logger = Logger()
 
 
 class SundialPipeline(ForecastPipeline):
-    def __init__(self, model_info, **model_kwargs):
+    def __init__(self, model_info: ModelInfo, **model_kwargs):
         super().__init__(model_info, model_kwargs=model_kwargs)
 
-    def preprocess(self, inputs):
+    def preprocess(self, inputs, **infer_kwargs) -> torch.Tensor:
         """
-        The inputs shape should be 3D, but Sundial only supports 2D tensor: 
[batch_size, sequence_length],
-        we need to squeeze the target_count dimension.
+        Preprocess the input data by converting it to a 2D tensor (Sundial 
only supports 2D inputs).
+
+        Parameters:
+            inputs (list): A list of dictionaries containing input data,
+                           where each dictionary includes a "targets" key.
+            **infer_kwargs: Additional keyword arguments passed to the method.
+
+        Returns:
+            torch.Tensor: A 2D tensor with shape [batch_size, input_length] 
after squeezing
+                          the target_count dimension.
+
+        Raises:
+            InferenceModelInternalException: If the model receives more than 
one target variable
+                                             (i.e., when inputs.shape[1] != 1).
         """
-        inputs = super().preprocess(inputs)
+        model_id = self.model_info.model_id
+        inputs = super().preprocess(inputs, **infer_kwargs)
+        # Here, we assume element in list has same history_length,
+        # otherwise, the model cannot proceed
+        if inputs[0].get("past_covariates", None) or inputs[0].get(
+            "future_covariates", None
+        ):
+            logger.warning(
+                f"[Inference] Past_covariates and future_covariates will be 
ignored, as they are not supported for model {model_id}."
+            )

Review Comment:
   The condition checks if past_covariates or future_covariates exist but uses 
`or` logic. This means if only `past_covariates` is provided (which should be 
supported), the warning will still be logged. Consider checking each covariate 
type separately if they have different support levels, or clarify if both are 
truly unsupported.
   ```suggestion
           has_past_covariates = inputs[0].get("past_covariates") is not None
           has_future_covariates = inputs[0].get("future_covariates") is not 
None
           if has_past_covariates or has_future_covariates:
               if has_past_covariates and has_future_covariates:
                   logger.warning(
                       f"[Inference] Past_covariates and future_covariates will 
be ignored, as they are not supported for model {model_id}."
                   )
               elif has_past_covariates:
                   logger.warning(
                       f"[Inference] Past_covariates will be ignored, as they 
are not supported for model {model_id}."
                   )
               else:
                   logger.warning(
                       f"[Inference] Future_covariates will be ignored, as they 
are not supported for model {model_id}."
                   )
   ```



##########
iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py:
##########
@@ -21,84 +21,207 @@
 import torch
 
 from iotdb.ainode.core.exception import InferenceModelInternalException
+from iotdb.ainode.core.model.model_info import ModelInfo
 from iotdb.ainode.core.model.model_loader import load_model
 
 
 class BasicPipeline(ABC):
-    def __init__(self, model_info, **model_kwargs):
+    def __init__(self, model_info: ModelInfo, **model_kwargs):
         self.model_info = model_info
         self.device = model_kwargs.get("device", "cpu")
         self.model = load_model(model_info, device_map=self.device, 
**model_kwargs)
 
     @abstractmethod
-    def preprocess(self, inputs):
+    def preprocess(self, inputs, **infer_kwargs):
         """
         Preprocess the input before inference, including shape validation and 
value transformation.
         """
         raise NotImplementedError("preprocess not implemented")
 
     @abstractmethod
-    def postprocess(self, outputs: torch.Tensor):
+    def postprocess(self, outputs, **infer_kwargs):
         """
         Post-process the outputs after the entire inference task.
         """
         raise NotImplementedError("postprocess not implemented")
 
 
 class ForecastPipeline(BasicPipeline):
-    def __init__(self, model_info, **model_kwargs):
+    def __init__(self, model_info: ModelInfo, **model_kwargs):
         super().__init__(model_info, model_kwargs=model_kwargs)
 
-    def preprocess(self, inputs):
+    def preprocess(
+        self,
+        inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]],
+        **infer_kwargs,
+    ):
         """
-        The inputs should be 3D tensor: [batch_size, target_count, 
sequence_length].
+        Preprocess the input data before passing it to the model for 
inference, validating the shape and type of the input data.
+
+        Args:
+            inputs (list[dict]):
+                The input data, a list of dictionaries, where each dictionary 
contains:
+                    - 'targets': A tensor (1D or 2D) of shape (input_length,) 
or (target_count, input_length).
+                    - 'past_covariates': A dictionary of tensors (optional), 
where each tensor has shape (input_length,).
+                    - 'future_covariates': A dictionary of tensors (optional), 
where each tensor has shape (input_length,).
+
+            infer_kwargs (dict, optional): Additional keyword arguments for 
inference, such as:
+                - `output_length`(int): Used to check validation of 
'future_covariates' if provided.
+
+        Raises:
+            ValueError: If the input format is incorrect (e.g., missing keys, 
invalid tensor shapes).
+
+        Returns:
+            The preprocessed inputs, validated and ready for model inference.
         """
-        if len(inputs.shape) != 3:
-            raise InferenceModelInternalException(
-                f"[Inference] Input must be: [batch_size, target_count, 
sequence_length], but receives {inputs.shape}"
+
+        if isinstance(inputs, list):
+            output_length = infer_kwargs.get("output_length", 96)
+            for idx, input_dict in enumerate(inputs):
+                # Check if the dictionary contains the expected keys
+                if not isinstance(input_dict, dict):
+                    raise ValueError(f"Input at index {idx} is not a 
dictionary.")
+
+                required_keys = ["targets"]
+                for key in required_keys:
+                    if key not in input_dict:
+                        raise ValueError(
+                            f"Key '{key}' is missing in input at index {idx}."
+                        )
+
+                # Check 'targets' is torch.Tensor and has the correct shape
+                targets = input_dict["targets"]
+                if not isinstance(targets, torch.Tensor):
+                    raise ValueError(
+                        f"'targets' must be torch.Tensor, but got 
{type(targets)} at index {idx}."
+                    )
+                if targets.ndim not in [1, 2]:
+                    raise ValueError(
+                        f"'targets' must have 1 or 2 dimensions, but got 
{targets.ndim} dimensions at index {idx}."
+                    )
+                # If targets is 2-d, check if the second dimension is 
input_length
+                if targets.ndim == 2:
+                    n_variates, input_length = targets.shape

Review Comment:
   The variable 'n_variates' is assigned on line 104 but never used. If it's 
not needed for any validation or processing, consider removing this assignment 
or using it for additional validation logic.
   ```suggestion
                       _, input_length = targets.shape
   ```



##########
iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py:
##########
@@ -21,84 +21,207 @@
 import torch
 
 from iotdb.ainode.core.exception import InferenceModelInternalException
+from iotdb.ainode.core.model.model_info import ModelInfo
 from iotdb.ainode.core.model.model_loader import load_model
 
 
 class BasicPipeline(ABC):
-    def __init__(self, model_info, **model_kwargs):
+    def __init__(self, model_info: ModelInfo, **model_kwargs):
         self.model_info = model_info
         self.device = model_kwargs.get("device", "cpu")
         self.model = load_model(model_info, device_map=self.device, 
**model_kwargs)
 
     @abstractmethod
-    def preprocess(self, inputs):
+    def preprocess(self, inputs, **infer_kwargs):
         """
         Preprocess the input before inference, including shape validation and 
value transformation.
         """
         raise NotImplementedError("preprocess not implemented")
 
     @abstractmethod
-    def postprocess(self, outputs: torch.Tensor):
+    def postprocess(self, outputs, **infer_kwargs):
         """
         Post-process the outputs after the entire inference task.
         """
         raise NotImplementedError("postprocess not implemented")
 
 
 class ForecastPipeline(BasicPipeline):
-    def __init__(self, model_info, **model_kwargs):
+    def __init__(self, model_info: ModelInfo, **model_kwargs):
         super().__init__(model_info, model_kwargs=model_kwargs)
 
-    def preprocess(self, inputs):
+    def preprocess(
+        self,
+        inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]],
+        **infer_kwargs,
+    ):
         """
-        The inputs should be 3D tensor: [batch_size, target_count, 
sequence_length].
+        Preprocess the input data before passing it to the model for 
inference, validating the shape and type of the input data.
+
+        Args:
+            inputs (list[dict]):
+                The input data, a list of dictionaries, where each dictionary 
contains:
+                    - 'targets': A tensor (1D or 2D) of shape (input_length,) 
or (target_count, input_length).
+                    - 'past_covariates': A dictionary of tensors (optional), 
where each tensor has shape (input_length,).
+                    - 'future_covariates': A dictionary of tensors (optional), 
where each tensor has shape (input_length,).
+
+            infer_kwargs (dict, optional): Additional keyword arguments for 
inference, such as:
+                - `output_length`(int): Used to check validation of 
'future_covariates' if provided.
+
+        Raises:
+            ValueError: If the input format is incorrect (e.g., missing keys, 
invalid tensor shapes).
+
+        Returns:
+            The preprocessed inputs, validated and ready for model inference.
         """
-        if len(inputs.shape) != 3:
-            raise InferenceModelInternalException(
-                f"[Inference] Input must be: [batch_size, target_count, 
sequence_length], but receives {inputs.shape}"
+
+        if isinstance(inputs, list):
+            output_length = infer_kwargs.get("output_length", 96)
+            for idx, input_dict in enumerate(inputs):
+                # Check if the dictionary contains the expected keys
+                if not isinstance(input_dict, dict):
+                    raise ValueError(f"Input at index {idx} is not a 
dictionary.")
+
+                required_keys = ["targets"]
+                for key in required_keys:
+                    if key not in input_dict:
+                        raise ValueError(
+                            f"Key '{key}' is missing in input at index {idx}."
+                        )
+
+                # Check 'targets' is torch.Tensor and has the correct shape
+                targets = input_dict["targets"]
+                if not isinstance(targets, torch.Tensor):
+                    raise ValueError(
+                        f"'targets' must be torch.Tensor, but got 
{type(targets)} at index {idx}."
+                    )
+                if targets.ndim not in [1, 2]:
+                    raise ValueError(
+                        f"'targets' must have 1 or 2 dimensions, but got 
{targets.ndim} dimensions at index {idx}."
+                    )
+                # If targets is 2-d, check if the second dimension is 
input_length
+                if targets.ndim == 2:
+                    n_variates, input_length = targets.shape
+                else:
+                    input_length = targets.shape[
+                        0
+                    ]  # for 1-d targets, shape should be (input_length,)

Review Comment:
   The comment states "for 1-d targets, shape should be (input_length,)" but 
doesn't reflect that for 1D targets, you're not validating anything about 
target_count. The comment might be misleading since it seems to imply a 
specific expected structure that isn't actually enforced.
   ```suggestion
                       ]  # for 1-d targets, infer input_length from the first 
(and only) dimension
   ```



##########
iotdb-core/ainode/iotdb/ainode/core/model/sktime/pipeline_sktime.py:
##########
@@ -22,54 +22,94 @@
 
 from iotdb.ainode.core.exception import InferenceModelInternalException
 from iotdb.ainode.core.inference.pipeline.basic_pipeline import 
ForecastPipeline
+from iotdb.ainode.core.log import Logger
+from iotdb.ainode.core.model.model_info import ModelInfo
+
+logger = Logger()
 
 
 class SktimePipeline(ForecastPipeline):
-    def __init__(self, model_info, **model_kwargs):
+    def __init__(self, model_info: ModelInfo, **model_kwargs):
         model_kwargs.pop("device", None)  # sktime models run on CPU
         super().__init__(model_info, model_kwargs=model_kwargs)
 
-    def preprocess(self, inputs):
-        inputs = super().preprocess(inputs)
+    def preprocess(
+        self,
+        inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]],
+        **infer_kwargs,
+    ) -> list[pd.Series]:
+        """
+        Preprocess the input data for forecasting.
+
+        Parameters:
+            inputs (list): A list of dictionaries containing input data with 
key 'targets'.
+
+        Returns:
+            list of pd.Series: Processed inputs for the model with each of 
shape [input_length, ].
+        """
+        model_id = self.model_info.model_id
+
+        inputs = super().preprocess(inputs, **infer_kwargs)
+
+        # Here, we assume element in list has same history_length,
+        # otherwise, the model cannot proceed
+        if inputs[0].get("past_covariates", None) or inputs[0].get(
+            "future_covariates", None

Review Comment:
   The condition checks if past_covariates or future_covariates exist but uses 
`or` logic. This means if only `past_covariates` is provided (which should be 
supported), the warning will still be logged. Consider checking each covariate 
type separately if they have different support levels, or clarify if both are 
truly unsupported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to