194 lines
8.8 KiB
Python
194 lines
8.8 KiB
Python
from tqdm import tqdm
|
|
import torch
|
|
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
|
|
from sklearn.model_selection import KFold
|
|
from torchmetrics.functional import auroc
|
|
import torch.optim as optim
|
|
import numpy as np
|
|
from all_models_tools.all_model_tools import call_back
|
|
from Model_Loss.Loss import Entropy_Loss
|
|
from merge_class.merge import merge
|
|
from draw_tools.Grad_cam import GradCAM
|
|
from torch.utils.data import Subset, DataLoader
|
|
import time
|
|
|
|
|
|
class All_Step:
|
|
def __init__(self, PreProcess_Classes_Data, Batch, Model, Epoch, Number_Of_Classes, Model_Name):
|
|
self.PreProcess_Classes_Data = PreProcess_Classes_Data
|
|
self.Training_DataLoader, self.Test_Dataloader = self.PreProcess_Classes_Data.Total_Data_Combine_To_DataLoader(Batch)
|
|
|
|
self.Model = Model
|
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
self.Epoch = Epoch
|
|
self.Number_Of_Classes = Number_Of_Classes
|
|
|
|
self.Model_Name = Model_Name
|
|
|
|
pass
|
|
|
|
def Training_Step(self, model_name, counter):
|
|
# 定義優化器,並設定 weight_decay 參數來加入 L2 正則化
|
|
Optimizer = optim.SGD(self.Model.parameters(), lr=0.045, momentum = 0.9, weight_decay=0.1)
|
|
model_path, early_stopping, scheduler = call_back(model_name, counter, Optimizer)
|
|
|
|
criterion = Entropy_Loss() # 使用自定義的損失函數
|
|
Merge_Function = merge()
|
|
train_losses = []
|
|
val_losses = []
|
|
train_accuracies = []
|
|
val_accuracies = []
|
|
Total_Epoch = 0
|
|
|
|
K_Flod = KFold(n_splits = 5, shuffle = True, random_state = 42)
|
|
|
|
for epoch in range(self.Epoch): # 訓練迴圈
|
|
self.Model.train() # 開始訓練
|
|
running_loss = 0.0
|
|
all_train_preds = []
|
|
all_train_labels = []
|
|
processed_samples = 0
|
|
|
|
# 計算每個 epoch 的起始時間
|
|
start_time = time.time()
|
|
total_samples = len(self.Training_DataLoader)
|
|
train_subset = ""
|
|
val_subset = ""
|
|
|
|
for fold, (train_idx, vali_idx) in enumerate( K_Flod.split(self.PreProcess_Classes_Data.Training_Datas)):
|
|
# Create training and validation subsets for this fold
|
|
train_subset = Subset(self.Training_DataLoader, train_idx)
|
|
val_subset = Subset(self.Training_DataLoader, vali_idx)
|
|
|
|
Training_Data = DataLoader(train_subset, self.Training_DataLoader.batch_size, num_workers = 0, pin_memory=True, shuffle = True)
|
|
|
|
epoch_iterator = tqdm(Training_Data, desc=f"Epoch [{epoch}/{self.Epoch}]")
|
|
|
|
for inputs, labels in epoch_iterator:
|
|
inputs, labels = torch.as_tensor(inputs).to(self.device), torch.as_tensor(labels).to(self.device)
|
|
|
|
Optimizer.zero_grad()
|
|
outputs = self.Model(inputs)
|
|
loss = criterion(outputs, labels)
|
|
loss.backward()
|
|
Optimizer.step()
|
|
running_loss += loss.item()
|
|
|
|
# 收集訓練預測和標籤
|
|
Output_Values, Output_Indexs = torch.max(outputs, dim = 1)
|
|
True_Indexs = np.argmax(labels.cpu().numpy(), 1)
|
|
|
|
all_train_preds.append(Output_Indexs.cpu().numpy())
|
|
all_train_labels.append(True_Indexs)
|
|
|
|
processed_samples += len(inputs)
|
|
|
|
# 計算當前進度
|
|
progress = (processed_samples / total_samples) * 100
|
|
|
|
# 計算經過時間和剩餘時間
|
|
elapsed_time = time.time() - start_time
|
|
iterations_per_second = processed_samples / elapsed_time if elapsed_time > 0 else 0
|
|
eta = (total_samples - processed_samples) / iterations_per_second if iterations_per_second > 0 else 0
|
|
time_str = f"{int(elapsed_time//60):02d}:{int(elapsed_time%60):02d}<{int(eta//60):02d}:{int(eta%60):02d}"
|
|
|
|
# 計算當前批次的精確度(這裡需要根據你的具體需求調整)
|
|
batch_accuracy = (Output_Indexs.cpu().numpy() == True_Indexs).mean()
|
|
|
|
# 更新進度條顯示
|
|
epoch_iterator.set_description(f"Epoch [{epoch}/{self.Epoch}]")
|
|
epoch_iterator.set_postfix_str(
|
|
f"{processed_samples}/{total_samples} [{time_str}, {iterations_per_second:.2f}it/s, " +
|
|
f"acc={batch_accuracy:.3f}, loss={loss.item():.3f}, ]"
|
|
)
|
|
|
|
epoch_iterator.close()
|
|
|
|
all_train_preds = Merge_Function.merge_data_main(all_train_preds, 0, len(all_train_preds))
|
|
all_train_labels = Merge_Function.merge_data_main(all_train_labels, 0, len(all_train_labels))
|
|
|
|
Training_Loss = running_loss / len(self.Training_DataLoader)
|
|
train_accuracy = accuracy_score(all_train_labels, all_train_preds)
|
|
|
|
train_losses.append(Training_Loss)
|
|
train_accuracies.append(train_accuracy)
|
|
|
|
self.Model.eval()
|
|
val_loss = 0.0
|
|
all_val_preds = []
|
|
all_val_labels = []
|
|
|
|
with torch.no_grad():
|
|
for inputs, labels in val_subset:
|
|
inputs, labels = torch.as_tensor(inputs).to(self.device), torch.as_tensor(labels).to(self.device)
|
|
|
|
outputs = self.Model(inputs)
|
|
loss = criterion(outputs, labels)
|
|
val_loss += loss.item()
|
|
|
|
# 收集訓練預測和標籤
|
|
Output_Values, Output_Indexs = torch.max(outputs, dim = 1)
|
|
True_Indexs = np.argmax(labels.cpu().numpy(), 1)
|
|
|
|
all_val_preds.append(Output_Indexs.cpu().numpy())
|
|
all_val_labels.append(True_Indexs)
|
|
|
|
val_loss /= len(val_subset)
|
|
val_accuracy = accuracy_score(all_val_labels, all_val_preds)
|
|
|
|
val_losses.append(val_loss)
|
|
val_accuracies.append(val_accuracy)
|
|
# print(f"Val_loss: {val_loss:.4f}, Val_accuracy: {val_accuracy:0.2f}\n")
|
|
|
|
early_stopping(val_loss, self.Model, model_path)
|
|
if early_stopping.early_stop:
|
|
print("Early stopping triggered. Training stopped.")
|
|
Total_Epoch = epoch
|
|
break
|
|
|
|
# 學習率調整
|
|
scheduler.step(val_loss)
|
|
|
|
return train_losses, val_losses, train_accuracies, val_accuracies, Total_Epoch
|
|
|
|
def Evaluate_Model(self, cnn_model, counter):
|
|
# 測試模型
|
|
cnn_model.eval()
|
|
True_Label, Predict_Label = [], []
|
|
True_Label_OneHot, Predict_Label_OneHot = [], []
|
|
loss = 0.0
|
|
|
|
with torch.no_grad():
|
|
for images, labels in self.Test_Dataloader:
|
|
images, labels = torch.tensor(images).to(self.device), torch.tensor(labels).to(self.device)
|
|
|
|
outputs = cnn_model(images)
|
|
|
|
# 收集訓練預測和標籤
|
|
Output_Values, Output_Indexs = torch.max(outputs, 1)
|
|
True_Indexs = np.argmax(labels.cpu().numpy(), 1)
|
|
|
|
True_Label.append(Output_Indexs.cpu().numpy())
|
|
Predict_Label.append(True_Indexs)
|
|
|
|
Predict_Label_OneHot.append(torch.tensor(outputs, dtype = torch.float32).cpu().numpy()[0])
|
|
True_Label_OneHot.append(torch.tensor(labels, dtype = torch.int).cpu().numpy()[0])
|
|
|
|
# # 創建 GradCAM 實例
|
|
# Layers = cnn_model.base_model.body.conv4.pointwise
|
|
# grad_cam = GradCAM(cnn_model, target_layer="base_model")
|
|
# # 可視化 Grad-CAM
|
|
# grad_cam.visualize(outputs, images, target_class = 3, File_Name = counter, model_name = self.Model_Name)
|
|
|
|
loss /= len(self.Test_Dataloader)
|
|
|
|
True_Label_OneHot = torch.tensor(True_Label_OneHot, dtype = torch.int)
|
|
Predict_Label_OneHot = torch.tensor(Predict_Label_OneHot, dtype = torch.float32)
|
|
|
|
accuracy = accuracy_score(True_Label, Predict_Label)
|
|
precision = precision_score(True_Label, Predict_Label, average = "macro")
|
|
recall = recall_score(True_Label, Predict_Label, average = "macro")
|
|
AUC = auroc(Predict_Label_OneHot, True_Label_OneHot, num_labels = self.Number_Of_Classes, task = "multilabel", average = "macro")
|
|
f1 = f1_score(True_Label, Predict_Label, average = "macro")
|
|
return loss, accuracy, precision, recall, AUC, f1, True_Label, Predict_Label |