0-finetune-chronos-etth.py 代码说明这份脚本做的事情其实可以用一句话概括:
chronos-t5-tiny 和本地 ETTh1.csv,跑一个最小可运行的时间序列微调闭环。它不是完整研究级训练框架,而是一个课堂版 demo。
它的目标是让编程基础还不强的同学,也能看清楚下面这条路径:
T5ForConditionalGeneration 做 teacher-forcing 训练如果你把它和前面:
5/2-prog/0-chronos-etth-demo.py对照着看,会更容易理解:
2-prog:只做 zero-shot 预测4-prog:开始做 最小微调也就是说,这份脚本是在上一份 demo 的基础上往前走了一步。
这份脚本可以分成 10 个部分:
WindowExampleSlidingWindowDatasetcollate_fnimport argparse
import json
import math
import os
from dataclasses import dataclass
from pathlib import Path这些是 Python 标准库:
argparse 解析命令行参数json 把训练结果写成 JSON 文件math 这里主要用来处理 nanos 用来清理代理环境变量dataclass 用来定义简单数据结构Path 用更清晰的方式处理文件路径再看下面这一组:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from torch.optim import AdamW
from transformers import AutoConfig, AutoModelForSeq2SeqLM
from chronos import ChronosConfig, ChronosPipeline它们的角色分别是:
matplotlib 画图numpy 算平均值、分位数pandas 读取 CSVtorch 深度学习训练的基础库DataLoader, Dataset 组织训练数据AdamW 优化器AutoConfig 读取模型配置AutoModelForSeq2SeqLM 加载 T5 结构模型ChronosConfig 读取 Chronos 特有配置ChronosPipeline 微调后做预测对比时使用的官方推理接口WindowExample@dataclass
class WindowExample:
context: torch.Tensor
future: torch.Tensor这个类非常简单,它只是把一个样本拆成两个部分:
context 历史窗口future 对应的未来窗口你可以把它理解成:
为什么要这么写?
因为时间序列训练最自然的监督信号就是:
SlidingWindowDataset这一段是整份脚本最重要的部分之一。
它把一整条长时间序列切成很多训练样本。
例如:
512 个点当 history64 个点当 future这样就能从一条长序列里切出很多 (context, future) 对。
__init__ 里的参数是什么意思def __init__(
self,
values: np.ndarray,
context_length: int,
prediction_length: int,
max_windows: int,
stride: int,
) -> None:values 原始时间序列context_length 每个样本里历史窗口的长度prediction_length 每个样本里未来窗口的长度max_windows 最多切多少个窗口,防止课堂 demo 太慢stride 滑动步长。比如 8 表示每次往前挪 8 个点再切下一个窗口self.examples = []
total = context_length + prediction_length
upper = len(values) - total + 1self.examples = [] 用来存所有样本total 一个样本总共需要多少点upper 最后一个可切窗口的起点上界for start in range(0, max(upper, 0), stride):这表示:
0 个点开始切stridecontext = values[start : start + context_length]
future = values[start + context_length : start + total]这是最标准的时间序列监督学习切法:
if len(future) < prediction_length:
break作用是:
否则最后一个样本会不完整。
torch.tensorself.examples.append(
WindowExample(
context=torch.tensor(context, dtype=torch.float32),
future=torch.tensor(future, dtype=torch.float32),
)
)因为后面训练要用 PyTorch,所以这里先统一转成:
torch.Tensor并且用:
dtype=torch.float32表示这是浮点数时间序列。
if len(self.examples) >= max_windows:
break这一步很重要,尤其在课堂 demo 里。
它的作用是:
也就是说,这个脚本追求的是:
而不是先追求最优结果。
__len__ 和 __getitem__def __len__(self) -> int:
return len(self.examples)
def __getitem__(self, idx: int) -> WindowExample:
return self.examples[idx]这是 PyTorch Dataset 的标准接口:
__len__ 告诉 DataLoader 数据集有多大__getitem__ 告诉 DataLoader 给我第 idx 个样本build_tokenizerdef build_tokenizer(model_dir: Path):
cfg = AutoConfig.from_pretrained(model_dir)
chronos_cfg = ChronosConfig(**cfg.chronos_config)
return chronos_cfg.create_tokenizer(), chronos_cfg这段代码非常关键,因为它说明:
它还带了一层 Chronos 专用配置:
逐行看:
AutoConfig.from_pretrained(model_dir) 读取模型目录里的 config.jsoncfg.chronos_config 取出其中 Chronos 特有的配置字段ChronosConfig(**cfg.chronos_config) 把字典变成一个 Chronos 配置对象chronos_cfg.create_tokenizer() 根据这份配置创建 tokenizer这里最重要的理解是:
build_collate_fn这一段是第二个最关键的地方。
collate_fn因为 DataLoader 每次拿到的是一批 WindowExample,
但模型真正需要的是:
input_idsattention_masklabelscollate_fn 的作用就是:
contexts = torch.stack([item.context for item in batch], dim=0)
futures = torch.stack([item.future for item in batch], dim=0)这里的 torch.stack 表示:
结果就会像:
contexts.shape = [batch_size, context_length]futures.shape = [batch_size, prediction_length]input_ids, attention_mask, scale = tokenizer.context_input_transform(contexts)这一步在做:
返回三个东西:
input_ids token 序列attention_mask 哪些位置是真实输入scale 这个样本的缩放因子为什么要 scale?
因为 Chronos 不是直接把原始数值硬离散化,而是:
这样不同量级的序列更容易共享一套 token 空间。
labels, labels_mask = tokenizer.label_input_transform(futures, scale)这一句表示:
这一步非常像:
-100labels = labels.clone()
labels[~labels_mask] = -100在 Hugging Face 的 seq2seq 训练里:
labels == -100 表示这个位置不参与 loss 计算所以这里是在说:
return {
"input_ids": input_ids.long(),
"attention_mask": attention_mask.long(),
"labels": labels.long(),
}这就是标准的 Hugging Face 训练输入格式。
pick_devicedef pick_device() -> torch.device:
return torch.device("cuda" if torch.cuda.is_available() else "cpu")这一段很朴素:
对学生来说,这也是一个很好的工程习惯:
forecast_with_pipeline这段函数的作用是:
ChronosPipeline为什么要单独写这个函数?
因为我们后面要做两次:
这样就能直接比较:
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.bfloat16 if device == "cuda" else torch.float32这里表示:
bfloat16float32再看:
pipeline = ChronosPipeline.from_pretrained(
str(model_dir),
device_map=device,
dtype=dtype,
)这表示:
这里的 model_dir 可能是:
这正是这份脚本最漂亮的地方:
forecast = pipeline.predict(context, prediction_length, num_samples=num_samples)这里:
context 是历史窗口prediction_length 未来要预测多长num_samples 采样多少条未来轨迹返回结果不是一条序列,而是:
所以后面我们才会再去算:
quantilesdef quantiles(samples: np.ndarray):
return np.quantile(samples, [0.1, 0.5, 0.9], axis=0)这一段非常短,但很重要。
它做的是:
这里:
0.1 可以理解成偏低边界0.5 是中位数预测0.9 是偏高边界这样就能画出:
main() 里的参数这一部分先用 argparse 定义命令行参数。
--base-model
--csv
--target--base-model 基础模型路径,默认是 ../2-prog/models/chronos-t5-tiny--csv 数据文件路径,默认是 ../2-prog/data/ETTh1.csv--target 要预测的列,默认是 OT--context-length
--prediction-length
--stride--context-length 每个样本里历史窗口长度--prediction-length 每个样本里未来窗口长度--stride 相邻窗口起点之间相隔多少步--max-train-windows
--max-val-windows--max-train-windows 训练最多切多少个窗口--max-val-windows 验证最多切多少个窗口这两个参数最主要是为了:
--epochs
--batch-size
--learning-rate--epochs 训练多少轮--batch-size 每次喂给模型多少个样本--learning-rate 每次参数更新的步子有多大--num-samples
--output-dir
--plot-path
--metrics-path--num-samples 预测时采样几条未来轨迹--output-dir 微调后模型保存到哪里--plot-path 预测对比图保存到哪里--metrics-path 指标 JSON 保存到哪里for key in [...]:
os.environ.pop(key, None)这一段和你前面本地 Ollama demo 里看到的很像。
作用是:
课堂上最简单的理解就是:
base_model = Path(args.base_model).resolve()
csv_path = Path(args.csv).resolve()
output_dir = Path(args.output_dir)
plot_path = Path(args.plot_path)
metrics_path = Path(args.metrics_path)这一步就是:
后面这几句:
output_dir.mkdir(parents=True, exist_ok=True)
plot_path.parent.mkdir(parents=True, exist_ok=True)
metrics_path.parent.mkdir(parents=True, exist_ok=True)作用是:
这是一种很实用的工程习惯:
if not base_model.exists():
raise FileNotFoundError(...)
if not csv_path.exists():
raise FileNotFoundError(...)这是在提前防止最常见错误:
比起让程序后面莫名报错,这种“提前检查并报清楚”更适合教学。
tokenizer, chronos_cfg = build_tokenizer(base_model)
if args.prediction_length != chronos_cfg.prediction_length:
raise ValueError(...)这个检查非常关键。
为什么?
因为当前 chronos-t5-tiny 的配置本身规定了:
prediction_length = 64如果你硬把训练标签长度改成别的值,tokenizer 这边就不一致了。
所以这里是在提醒学生:
df = pd.read_csv(csv_path)
if args.target not in df.columns:
raise ValueError(...)
values = df[args.target].astype(float).to_numpy()逐行解释:
pd.read_csv(csv_path) 读入 ETTh1 数据astype(float) 强制转成浮点数to_numpy() 从 DataFrame 列变成 NumPy 数组为什么最后要变成 NumPy?
因为后面自己切窗口时,NumPy 一维数组最直接。
split_train = int(len(values) * 0.8)
split_val = int(len(values) * 0.9)这里使用的是最简单的时间顺序切分:
这比随机打乱更符合时间序列场景,因为:
val_values 和 test_values 前面多切了一段val_values = values[split_train - args.context_length - args.prediction_length : split_val]
test_values = values[split_val - args.context_length - args.prediction_length :]这是为了保证:
也就是说,你不能只给验证集剩一小段未来,还得留足够长的历史上下文。
train_ds = SlidingWindowDataset(...)
val_ds = SlidingWindowDataset(...)
collate_fn = build_collate_fn(tokenizer)
train_loader = DataLoader(...)
val_loader = DataLoader(...)这一段的逻辑是:
(context, future) 样本这里:
shuffle=True 训练时打乱窗口顺序shuffle=False 验证时不打乱这是 PyTorch 里非常常见的模式。
device = pick_device()
model = AutoModelForSeq2SeqLM.from_pretrained(str(base_model))
model.to(device)
model.train()逐行看:
pick_device() 选 CPU 还是 GPUAutoModelForSeq2SeqLM.from_pretrained(...) 加载底层 T5 模型model.to(device) 把模型搬到对应设备model.train() 切换到训练模式这里最值得学生记住的是:
optimizer = AdamW(model.parameters(), lr=args.learning_rate)
train_losses = []
val_losses = []AdamW 常见深度学习优化器model.parameters() 告诉优化器:你要更新哪些参数lr=args.learning_rate 学习率train_losses / val_losses 记录每轮训练和验证损失这一段是整份脚本最像“标准训练代码”的地方。
for epoch in range(args.epochs):意思是:
for batch in train_loader:
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
loss.backward()
optimizer.step()
optimizer.zero_grad()
epoch_losses.append(loss.item())逐行解释:
for batch in train_loader 一批一批拿训练样本batch = {k: v.to(device) ...} 把这一批张量搬到 CPU 或 GPUoutputs = model(**batch) 调用模型前向传播loss = outputs.loss 直接取 Hugging Face 模型算好的损失loss.backward() 反向传播,计算梯度optimizer.step() 按梯度更新参数optimizer.zero_grad() 清空旧梯度,避免累积epoch_losses.append(loss.item()) 把这一批的损失存下来这就是最标准的 PyTorch 训练闭环。
训练完一轮后,这段代码会跑验证集:
model.eval()
with torch.no_grad():
...
model.train()这里:
model.eval() 切到评估模式torch.no_grad() 不算梯度,省显存省时间model.train() 验证完再切回训练模式这一步的意义是:
model.save_pretrained(output_dir)
AutoConfig.from_pretrained(base_model).save_pretrained(output_dir)第一句表示:
第二句表示:
为什么配置也要存?
因为后面要重新用 ChronosPipeline.from_pretrained(output_dir) 加载这个模型。
如果只有权重、没有配置,很多框架就不知道该怎么还原它。
history = test_values[: args.context_length]
future = test_values[args.context_length : args.context_length + args.prediction_length]这表示:
它和训练时切窗口的思想完全一样,只不过这里我们只拿一个样本来做展示。
before = forecast_with_pipeline(base_model, ...)
after = forecast_with_pipeline(output_dir, ...)这两句是整份脚本的教学高潮。
它们分别表示:
这一步让学生能非常直观地看到:
mae_before = float(np.mean(np.abs(med_b - future)))
mae_after = float(np.mean(np.abs(med_a - future)))这里的 MAE 就是:
为什么拿中位数预测 med_b / med_a 去和真实值比?
因为:
plt.plot(x_hist, history, ...)
plt.plot(x_pred, future, ...)
plt.plot(x_pred, med_b, ...)
plt.fill_between(...)
plt.plot(x_pred, med_a, ...)
plt.fill_between(...)这张图会同时画出:
所以它非常适合课堂上讲:
如果学生不太会看数字指标,这张图往往比 MAE 更直观。
metrics = {
...
}
metrics_path.write_text(json.dumps(metrics, indent=2))这一段做的是:
为什么这一步重要?
因为真实实验里,只看终端输出是不够的。
你需要一个能复查的结果文件。
print(f"Saved fine-tuned model to: {output_dir}")
print(f"Saved comparison plot to: {plot_path}")
print(f"Saved metrics to: {metrics_path}")
print(f"MAE before fine-tune: {mae_before:.3f}")
print(f"MAE after fine-tune: {mae_after:.3f}")这一步很适合教学,因为它会把学生最关心的 5 件事一次性说清楚:
如果把整份脚本压缩成 4 句话,它真正想让你理解的是:
如果你觉得整份脚本还是长,最推荐先盯住下面 5 个位置:
SlidingWindowDataset 看清楚时间序列样本是怎样切出来的build_collate_fn 看清楚数值是怎样变成 token 的AutoModelForSeq2SeqLM.from_pretrained(...) 看清楚 Chronos 底层其实是什么模型before / after 看清楚为什么最终一定要做微调前后对比