Source code for bigdl.chronos.forecaster.autoformer_forecaster
#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import torch
from bigdl.chronos.forecaster.abstract import Forecaster
from bigdl.chronos.model.autoformer import model_creator
from torch.utils.data import TensorDataset, DataLoader
from bigdl.chronos.model.autoformer.Autoformer import AutoFormer, _transform_config_to_namedtuple
from bigdl.nano.utils.log4Error import invalidInputError
from bigdl.chronos.pytorch import TSTrainer as Trainer
import torch.nn as nn
import numpy as np
import os
from collections import namedtuple
[docs]class AutoformerForecaster(Forecaster):
def __init__(self,
past_seq_len,
future_seq_len,
input_feature_num,
output_feature_num,
label_len,
freq,
output_attention=False,
moving_avg=25,
d_model=128,
embed='timeF',
dropout=0.05,
factor=3,
n_head=8,
d_ff=256,
activation='gelu',
e_layer=2,
d_layers=1,
optimizer="Adam",
loss="mse",
lr=0.0001,
metrics=["mse"],
seed=None,
distributed=False,
workers_per_node=1,
distributed_backend="ray"):
"""
Build a AutoformerForecaster Forecast Model.
:param past_seq_len: Specify the history time steps (i.e. lookback).
:param future_seq_len: Specify the output time steps (i.e. horizon).
:param input_feature_num: Specify the feature dimension.
:param output_feature_num: Specify the output dimension.
:param optimizer: Specify the optimizer used for training. This value
defaults to "Adam".
:param loss: str or pytorch loss instance, Specify the loss function
used for training. This value defaults to "mse". You can choose
from "mse", "mae", "huber_loss" or any customized loss instance
you want to use.
:param lr: Specify the learning rate. This value defaults to 0.001.
:param metrics: A list contains metrics for evaluating the quality of
forecasting. You may only choose from "mse" and "mae" for a
distributed forecaster. You may choose from "mse", "mae",
"rmse", "r2", "mape", "smape" or a callable function for a
non-distributed forecaster. If callable function, it signature
should be func(y_true, y_pred), where y_true and y_pred are numpy
ndarray.
:param seed: int, random seed for training. This value defaults to None.
:param distributed: bool, if init the forecaster in a distributed
fashion. If True, the internal model will use an Orca Estimator.
If False, the internal model will use a pytorch model. The value
defaults to False.
:param workers_per_node: int, the number of worker you want to use.
The value defaults to 1. The param is only effective when
distributed is set to True.
:param distributed_backend: str, select from "ray" or
"horovod". The value defaults to "ray".
:param kwargs: other hyperparameter please refer to
https://github.com/zhouhaoyi/Informer2020#usage
"""
# config setting
self.data_config = {
"past_seq_len": past_seq_len,
"future_seq_len": future_seq_len,
"input_feature_num": input_feature_num,
"output_feature_num": output_feature_num
}
self.model_config = {
"seq_len": past_seq_len,
"label_len": label_len,
"pred_len": future_seq_len,
"output_attention": output_attention,
"moving_avg": moving_avg,
"enc_in": input_feature_num,
"d_model": d_model,
"embed": embed,
"freq": freq,
"dropout": dropout,
"dec_in": input_feature_num,
"factor": factor,
"n_head": n_head,
"d_ff": d_ff,
"activation": activation,
"e_layer": e_layer,
"c_out": output_feature_num,
"d_layers": d_layers
}
self.loss_config = {
"loss": loss
}
self.optim_config = {
"lr": lr,
"optim": optimizer
}
self.model_config.update(self.loss_config)
self.model_config.update(self.optim_config)
self.distributed = distributed
self.seed = seed
self.checkpoint_callback = True
# disable multi-process training for now.
# TODO: enable it in future.
self.num_processes = 1
self.use_ipex = False
self.onnx_available = False
self.quantize_available = False
self.use_amp = False
self.model_creator = model_creator
self.internal = model_creator(self.model_config)
[docs] def fit(self, data, epochs=1, batch_size=32):
"""
Fit(Train) the forecaster.
:param data: The data support following formats:
| 1. numpy ndarrays: generate from `TSDataset.roll`,
be sure to set label_len > 0 and time_enc = True
| 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`,
be sure to set label_len > 0 and time_enc = True
:param epochs: Number of epochs you want to train. The value defaults to 1.
:param batch_size: Number of batch size you want to train. The value defaults to 32.
if you input a pytorch dataloader for `data`, the batch_size will follow the
batch_size setted in `data`.
"""
# seed setting
from pytorch_lightning import seed_everything
seed_everything(seed=self.seed)
# distributed is not supported.
if self.distributed:
invalidInputError(False, "distributed is not support in Autoformer")
# transform a tuple to dataloader.
if isinstance(data, tuple):
data = DataLoader(TensorDataset(torch.from_numpy(data[0]),
torch.from_numpy(data[1]),
torch.from_numpy(data[2]),
torch.from_numpy(data[3]),),
batch_size=batch_size,
shuffle=True)
# Trainer init and fitting
self.trainer = Trainer(logger=False, max_epochs=epochs,
checkpoint_callback=self.checkpoint_callback, num_processes=1,
use_ipex=self.use_ipex, distributed_backend="spawn")
self.trainer.fit(self.internal, data)
[docs] def predict(self, data, batch_size=32):
"""
Predict using a trained forecaster.
:param data: The data support following formats:
| 1. numpy ndarrays: generate from `TSDataset.roll`,
be sure to set label_len > 0 and time_enc = True
| 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`,
be sure to set label_len > 0, time_enc = True and is_predict = True
:param batch_size: predict batch size. The value will not affect predict
result but will affect resources cost(e.g. memory and time).
:return: A list of numpy ndarray
"""
if self.distributed:
invalidInputError(False, "distributed is not support in Autoformer")
if isinstance(data, tuple):
data = DataLoader(TensorDataset(torch.from_numpy(data[0]),
torch.from_numpy(data[1]),
torch.from_numpy(data[2]),
torch.from_numpy(data[3]),),
batch_size=batch_size,
shuffle=False)
return self.trainer.predict(self.internal, data)
[docs] def evaluate(self, data, batch_size=32):
"""
Predict using a trained forecaster.
:param data: The data support following formats:
| 1. numpy ndarrays: generate from `TSDataset.roll`,
be sure to set label_len > 0 and time_enc = True
| 2. pytorch dataloader: generate from `TSDataset.to_torch_data_loader`,
be sure to set label_len > 0 and time_enc = True
:param batch_size: predict batch size. The value will not affect predict
result but will affect resources cost(e.g. memory and time).
:return: A dict, currently returns the loss rather than metrics
"""
# TODO: use metrics here
if self.distributed:
invalidInputError(False, "distributed is not support in Autoformer")
if isinstance(data, tuple):
data = DataLoader(TensorDataset(torch.from_numpy(data[0]),
torch.from_numpy(data[1]),
torch.from_numpy(data[2]),
torch.from_numpy(data[3]),),
batch_size=batch_size,
shuffle=False)
return self.trainer.validate(self.internal, data)
[docs] def load(self, checkpoint_file):
"""
restore the forecaster.
:param checkpoint_file: The checkpoint file location you want to load the forecaster.
"""
self.trainer = Trainer(logger=False, max_epochs=1,
checkpoint_callback=self.checkpoint_callback, num_processes=1,
use_ipex=self.use_ipex, distributed_backend="spawn")
args = _transform_config_to_namedtuple(self.model_config)
self.internal = AutoFormer.load_from_checkpoint(checkpoint_file, configs=args)
[docs] def save(self, checkpoint_file):
"""
save the forecaster.
:param checkpoint_file: The checkpoint file location you want to load the forecaster.
"""
self.trainer.save_checkpoint(checkpoint_file)