Copilot commented on code in PR #16978:
URL: https://github.com/apache/iotdb/pull/16978#discussion_r2667156792
##########
iotdb-core/ainode/iotdb/ainode/core/model/timer_xl/pipeline_timer.py:
##########
@@ -20,37 +20,83 @@
from iotdb.ainode.core.exception import InferenceModelInternalException
from iotdb.ainode.core.inference.pipeline.basic_pipeline import
ForecastPipeline
+from iotdb.ainode.core.log import Logger
+from iotdb.ainode.core.model.model_info import ModelInfo
+
+logger = Logger()
class TimerPipeline(ForecastPipeline):
- def __init__(self, model_info, **model_kwargs):
+ def __init__(self, model_info: ModelInfo, **model_kwargs):
super().__init__(model_info, model_kwargs=model_kwargs)
- def preprocess(self, inputs):
+ def preprocess(self, inputs, **infer_kwargs) -> torch.Tensor:
"""
- The inputs shape should be 3D, but Timer-XL only supports 2D tensor:
[batch_size, sequence_length],
- we need to squeeze the target_count dimension.
+ Preprocess the input data by converting it to a 2D tensor (Timer-XL
only supports 2D inputs).
+
+ Parameters:
+ inputs (list): A list of dictionaries containing input data,
+ where each dictionary should include a "targets"
key.
+ **infer_kwargs: Additional keyword arguments passed to the method.
+
+ Returns:
+ torch.Tensor: A 2D tensor of shape [batch_size, input_length]
after squeezing
+ the target_count dimension.
+
+ Raises:
+ InferenceModelInternalException: If the model receives more than
one target variable
+ (i.e., when inputs.shape[1] != 1).
"""
- inputs = super().preprocess(inputs)
+ model_id = self.model_info.model_id
+ inputs = super().preprocess(inputs, **infer_kwargs)
+ # Here, we assume element in list has same history_length,
+ # otherwise, the model cannot proceed
+ if inputs[0].get("past_covariates", None) or inputs[0].get(
+ "future_covariates", None
Review Comment:
The condition checks if past_covariates or future_covariates exist but uses
`or` logic. This means if only `past_covariates` is provided (which should be
supported), the warning will still be logged. Consider checking each covariate
type separately if they have different support levels, or clarify if both are
truly unsupported.
##########
iotdb-core/ainode/iotdb/ainode/core/model/chronos2/pipeline_chronos2.py:
##########
@@ -387,5 +449,28 @@ def _predict_step(
return prediction
- def postprocess(self, output: torch.Tensor):
- return output[0].mean(dim=1, keepdim=True)
+ def postprocess(
+ self, outputs: list[torch.Tensor], **infer_kwargs
+ ) -> list[torch.Tensor]:
+ """
+ Postprocesses the model's forecast outputs by selecting the 0.5
quantile or averaging over quantiles.
+
+ Args:
+ outputs (list[torch.Tensor]): List of forecast outputs, where each
output is a 3D-tensor with shape [target_count, quantile_count, output_length].
+
+ Returns:
+ list[torch.Tensor]: Processed list of forecast outputs, each is a
2D-tensor with shape [target_count, output_length].
+ """
+ outputs_list = []
+ for output in outputs:
+ # Check if 0.5 quantile is available
+ if 0.5 in self.quantiles:
+ idx = self.quantiles.index(0.5)
+ # Get the 0.5 quantile value
+ outputs_list.append(output[:, idx, :])
+ else:
+ # If 0.5 quantile is not provided,
+ # get the mean of all quantiles
+ outputs_list.append(output.mean(dim=1))
+ super().postprocess(outputs_list, **infer_kwargs)
+ return outputs_list
Review Comment:
The method calls super().postprocess() but doesn't use its return value. The
result is stored in outputs_list which is then returned directly. Either remove
the super() call if it's not needed, or use its return value instead of
returning outputs_list.
```suggestion
processed_outputs = super().postprocess(outputs_list, **infer_kwargs)
return processed_outputs if processed_outputs is not None else
outputs_list
```
##########
iotdb-core/ainode/iotdb/ainode/core/model/sundial/pipeline_sundial.py:
##########
@@ -20,42 +20,92 @@
from iotdb.ainode.core.exception import InferenceModelInternalException
from iotdb.ainode.core.inference.pipeline.basic_pipeline import
ForecastPipeline
+from iotdb.ainode.core.log import Logger
+from iotdb.ainode.core.model.model_info import ModelInfo
+
+logger = Logger()
class SundialPipeline(ForecastPipeline):
- def __init__(self, model_info, **model_kwargs):
+ def __init__(self, model_info: ModelInfo, **model_kwargs):
super().__init__(model_info, model_kwargs=model_kwargs)
- def preprocess(self, inputs):
+ def preprocess(self, inputs, **infer_kwargs) -> torch.Tensor:
"""
- The inputs shape should be 3D, but Sundial only supports 2D tensor:
[batch_size, sequence_length],
- we need to squeeze the target_count dimension.
+ Preprocess the input data by converting it to a 2D tensor (Sundial
only supports 2D inputs).
+
+ Parameters:
+ inputs (list): A list of dictionaries containing input data,
+ where each dictionary includes a "targets" key.
+ **infer_kwargs: Additional keyword arguments passed to the method.
+
+ Returns:
+ torch.Tensor: A 2D tensor with shape [batch_size, input_length]
after squeezing
+ the target_count dimension.
+
+ Raises:
+ InferenceModelInternalException: If the model receives more than
one target variable
+ (i.e., when inputs.shape[1] != 1).
"""
- inputs = super().preprocess(inputs)
+ model_id = self.model_info.model_id
+ inputs = super().preprocess(inputs, **infer_kwargs)
+ # Here, we assume element in list has same history_length,
+ # otherwise, the model cannot proceed
+ if inputs[0].get("past_covariates", None) or inputs[0].get(
+ "future_covariates", None
+ ):
+ logger.warning(
+ f"[Inference] Past_covariates and future_covariates will be
ignored, as they are not supported for model {model_id}."
+ )
Review Comment:
The condition checks if past_covariates or future_covariates exist but uses
`or` logic. This means if only `past_covariates` is provided (which should be
supported), the warning will still be logged. Consider checking each covariate
type separately if they have different support levels, or clarify if both are
truly unsupported.
```suggestion
has_past_covariates = inputs[0].get("past_covariates") is not None
has_future_covariates = inputs[0].get("future_covariates") is not
None
if has_past_covariates or has_future_covariates:
if has_past_covariates and has_future_covariates:
logger.warning(
f"[Inference] Past_covariates and future_covariates will
be ignored, as they are not supported for model {model_id}."
)
elif has_past_covariates:
logger.warning(
f"[Inference] Past_covariates will be ignored, as they
are not supported for model {model_id}."
)
else:
logger.warning(
f"[Inference] Future_covariates will be ignored, as they
are not supported for model {model_id}."
)
```
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py:
##########
@@ -21,84 +21,207 @@
import torch
from iotdb.ainode.core.exception import InferenceModelInternalException
+from iotdb.ainode.core.model.model_info import ModelInfo
from iotdb.ainode.core.model.model_loader import load_model
class BasicPipeline(ABC):
- def __init__(self, model_info, **model_kwargs):
+ def __init__(self, model_info: ModelInfo, **model_kwargs):
self.model_info = model_info
self.device = model_kwargs.get("device", "cpu")
self.model = load_model(model_info, device_map=self.device,
**model_kwargs)
@abstractmethod
- def preprocess(self, inputs):
+ def preprocess(self, inputs, **infer_kwargs):
"""
Preprocess the input before inference, including shape validation and
value transformation.
"""
raise NotImplementedError("preprocess not implemented")
@abstractmethod
- def postprocess(self, outputs: torch.Tensor):
+ def postprocess(self, outputs, **infer_kwargs):
"""
Post-process the outputs after the entire inference task.
"""
raise NotImplementedError("postprocess not implemented")
class ForecastPipeline(BasicPipeline):
- def __init__(self, model_info, **model_kwargs):
+ def __init__(self, model_info: ModelInfo, **model_kwargs):
super().__init__(model_info, model_kwargs=model_kwargs)
- def preprocess(self, inputs):
+ def preprocess(
+ self,
+ inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]],
+ **infer_kwargs,
+ ):
"""
- The inputs should be 3D tensor: [batch_size, target_count,
sequence_length].
+ Preprocess the input data before passing it to the model for
inference, validating the shape and type of the input data.
+
+ Args:
+ inputs (list[dict]):
+ The input data, a list of dictionaries, where each dictionary
contains:
+ - 'targets': A tensor (1D or 2D) of shape (input_length,)
or (target_count, input_length).
+ - 'past_covariates': A dictionary of tensors (optional),
where each tensor has shape (input_length,).
+ - 'future_covariates': A dictionary of tensors (optional),
where each tensor has shape (input_length,).
+
+ infer_kwargs (dict, optional): Additional keyword arguments for
inference, such as:
+ - `output_length`(int): Used to check validation of
'future_covariates' if provided.
+
+ Raises:
+ ValueError: If the input format is incorrect (e.g., missing keys,
invalid tensor shapes).
+
+ Returns:
+ The preprocessed inputs, validated and ready for model inference.
"""
- if len(inputs.shape) != 3:
- raise InferenceModelInternalException(
- f"[Inference] Input must be: [batch_size, target_count,
sequence_length], but receives {inputs.shape}"
+
+ if isinstance(inputs, list):
+ output_length = infer_kwargs.get("output_length", 96)
+ for idx, input_dict in enumerate(inputs):
+ # Check if the dictionary contains the expected keys
+ if not isinstance(input_dict, dict):
+ raise ValueError(f"Input at index {idx} is not a
dictionary.")
+
+ required_keys = ["targets"]
+ for key in required_keys:
+ if key not in input_dict:
+ raise ValueError(
+ f"Key '{key}' is missing in input at index {idx}."
+ )
+
+ # Check 'targets' is torch.Tensor and has the correct shape
+ targets = input_dict["targets"]
+ if not isinstance(targets, torch.Tensor):
+ raise ValueError(
+ f"'targets' must be torch.Tensor, but got
{type(targets)} at index {idx}."
+ )
+ if targets.ndim not in [1, 2]:
+ raise ValueError(
+ f"'targets' must have 1 or 2 dimensions, but got
{targets.ndim} dimensions at index {idx}."
+ )
+ # If targets is 2-d, check if the second dimension is
input_length
+ if targets.ndim == 2:
+ n_variates, input_length = targets.shape
Review Comment:
The variable 'n_variates' is assigned on line 104 but never used. If it's
not needed for any validation or processing, consider removing this assignment
or using it for additional validation logic.
```suggestion
_, input_length = targets.shape
```
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py:
##########
@@ -21,84 +21,207 @@
import torch
from iotdb.ainode.core.exception import InferenceModelInternalException
+from iotdb.ainode.core.model.model_info import ModelInfo
from iotdb.ainode.core.model.model_loader import load_model
class BasicPipeline(ABC):
- def __init__(self, model_info, **model_kwargs):
+ def __init__(self, model_info: ModelInfo, **model_kwargs):
self.model_info = model_info
self.device = model_kwargs.get("device", "cpu")
self.model = load_model(model_info, device_map=self.device,
**model_kwargs)
@abstractmethod
- def preprocess(self, inputs):
+ def preprocess(self, inputs, **infer_kwargs):
"""
Preprocess the input before inference, including shape validation and
value transformation.
"""
raise NotImplementedError("preprocess not implemented")
@abstractmethod
- def postprocess(self, outputs: torch.Tensor):
+ def postprocess(self, outputs, **infer_kwargs):
"""
Post-process the outputs after the entire inference task.
"""
raise NotImplementedError("postprocess not implemented")
class ForecastPipeline(BasicPipeline):
- def __init__(self, model_info, **model_kwargs):
+ def __init__(self, model_info: ModelInfo, **model_kwargs):
super().__init__(model_info, model_kwargs=model_kwargs)
- def preprocess(self, inputs):
+ def preprocess(
+ self,
+ inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]],
+ **infer_kwargs,
+ ):
"""
- The inputs should be 3D tensor: [batch_size, target_count,
sequence_length].
+ Preprocess the input data before passing it to the model for
inference, validating the shape and type of the input data.
+
+ Args:
+ inputs (list[dict]):
+ The input data, a list of dictionaries, where each dictionary
contains:
+ - 'targets': A tensor (1D or 2D) of shape (input_length,)
or (target_count, input_length).
+ - 'past_covariates': A dictionary of tensors (optional),
where each tensor has shape (input_length,).
+ - 'future_covariates': A dictionary of tensors (optional),
where each tensor has shape (input_length,).
+
+ infer_kwargs (dict, optional): Additional keyword arguments for
inference, such as:
+ - `output_length`(int): Used to check validation of
'future_covariates' if provided.
+
+ Raises:
+ ValueError: If the input format is incorrect (e.g., missing keys,
invalid tensor shapes).
+
+ Returns:
+ The preprocessed inputs, validated and ready for model inference.
"""
- if len(inputs.shape) != 3:
- raise InferenceModelInternalException(
- f"[Inference] Input must be: [batch_size, target_count,
sequence_length], but receives {inputs.shape}"
+
+ if isinstance(inputs, list):
+ output_length = infer_kwargs.get("output_length", 96)
+ for idx, input_dict in enumerate(inputs):
+ # Check if the dictionary contains the expected keys
+ if not isinstance(input_dict, dict):
+ raise ValueError(f"Input at index {idx} is not a
dictionary.")
+
+ required_keys = ["targets"]
+ for key in required_keys:
+ if key not in input_dict:
+ raise ValueError(
+ f"Key '{key}' is missing in input at index {idx}."
+ )
+
+ # Check 'targets' is torch.Tensor and has the correct shape
+ targets = input_dict["targets"]
+ if not isinstance(targets, torch.Tensor):
+ raise ValueError(
+ f"'targets' must be torch.Tensor, but got
{type(targets)} at index {idx}."
+ )
+ if targets.ndim not in [1, 2]:
+ raise ValueError(
+ f"'targets' must have 1 or 2 dimensions, but got
{targets.ndim} dimensions at index {idx}."
+ )
+ # If targets is 2-d, check if the second dimension is
input_length
+ if targets.ndim == 2:
+ n_variates, input_length = targets.shape
+ else:
+ input_length = targets.shape[
+ 0
+ ] # for 1-d targets, shape should be (input_length,)
Review Comment:
The comment states "for 1-d targets, shape should be (input_length,)" but
doesn't reflect that for 1D targets, you're not validating anything about
target_count. The comment might be misleading since it seems to imply a
specific expected structure that isn't actually enforced.
```suggestion
] # for 1-d targets, infer input_length from the first
(and only) dimension
```
##########
iotdb-core/ainode/iotdb/ainode/core/model/sktime/pipeline_sktime.py:
##########
@@ -22,54 +22,94 @@
from iotdb.ainode.core.exception import InferenceModelInternalException
from iotdb.ainode.core.inference.pipeline.basic_pipeline import
ForecastPipeline
+from iotdb.ainode.core.log import Logger
+from iotdb.ainode.core.model.model_info import ModelInfo
+
+logger = Logger()
class SktimePipeline(ForecastPipeline):
- def __init__(self, model_info, **model_kwargs):
+ def __init__(self, model_info: ModelInfo, **model_kwargs):
model_kwargs.pop("device", None) # sktime models run on CPU
super().__init__(model_info, model_kwargs=model_kwargs)
- def preprocess(self, inputs):
- inputs = super().preprocess(inputs)
+ def preprocess(
+ self,
+ inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]],
+ **infer_kwargs,
+ ) -> list[pd.Series]:
+ """
+ Preprocess the input data for forecasting.
+
+ Parameters:
+ inputs (list): A list of dictionaries containing input data with
key 'targets'.
+
+ Returns:
+ list of pd.Series: Processed inputs for the model with each of
shape [input_length, ].
+ """
+ model_id = self.model_info.model_id
+
+ inputs = super().preprocess(inputs, **infer_kwargs)
+
+ # Here, we assume element in list has same history_length,
+ # otherwise, the model cannot proceed
+ if inputs[0].get("past_covariates", None) or inputs[0].get(
+ "future_covariates", None
Review Comment:
The condition checks if past_covariates or future_covariates exist but uses
`or` logic. This means if only `past_covariates` is provided (which should be
supported), the warning will still be logged. Consider checking each covariate
type separately if they have different support levels, or clarify if both are
truly unsupported.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]