Files
Stomach_Cancer_Pytorch/experiments/Training/Xception_Identification_Test.py
2025-11-07 21:03:13 +08:00

487 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torchmetrics.functional import auroc
from torch.nn import functional
from torch import nn
import torch
from sklearn.model_selection import KFold
from torchinfo import summary
from sklearn.metrics import confusion_matrix
from all_models_tools.all_model_tools import call_back
from Model_Loss.Loss import Entropy_Loss
# from Model_Loss.binary_cross_entropy import BinaryCrossEntropy # 三分類不需要二分類損失函數
from merge_class.merge import merge
from draw_tools.Saliency_Map import SaliencyMap
from utils.Stomach_Config import Training_Config, Loading_Config, Save_Result_File_Config
from experiments.Models.Xception_Model_Modification import Xception
from experiments.Models.pytorch_Model import ModifiedXception
from Load_process.LoadData import Loding_Data_Root
from Training_Tools.PreProcess import Training_Precesses
from Calculate_Process.Calculate import Calculate
from Load_process.file_processing import Process_File
from draw_tools.draw import plot_history, draw_heatmap
from model_data_processing.processing import Image_Enhance_Training_Data
from draw_tools.Grad_cam import GradCAM
import time
import torch.optim as optim
import numpy as np
import torch
import pandas as pd
import datetime
import argparse
import os
class Xception_Identification_Block_Training_Step(Loding_Data_Root, Training_Precesses):
def __init__(self, Experiment_Name, Best_Model_Save_Root):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.train_subset = None # Training Dataset 的子集
self.val_subset = None # Validation Dataset 的子集
self.train_loader = None # Training DataLoader 的讀檔器
self.val_loader = None # Validation DataLoader 的讀檔器
self.Mask = None # 遮罩變數接收GastroSegNet產出來的Mask
self.Grad = None # 梯度變數後面用來執行Grad CAM
self.model_name = Training_Config["Model_Name"] # 取名,使用哪個模型(可能是預處理模型/自己設計的模型)
self.Epoch = Training_Config["Epoch"] # 訓練該模型的次數
self.train_batch_size = Training_Config["Train_Batch_Size"] # 訓練模型的Batch Size
self.Experiment_Name = Experiment_Name
self.Number_Of_Classes = len(Loading_Config["Training_Labels"])
self.Best_Model_Save_Root = Best_Model_Save_Root
# 初始化多個繼承物件
Training_Precesses.__init__(self, Training_Config["Image_Size"])
Loding_Data_Root.__init__(self, Loading_Config["Training_Labels"], Loading_Config["Train_Data_Root"], Loading_Config["Test_Data_Root"])
pass
'''
主函數,用來執行模型的訓練與驗證
'''
def Processing_Main(self, training_dataset, Test_Dataloader=None):
# Lists to store metrics across all folds
all_fold_train_losses = []
all_fold_val_losses = []
all_fold_train_accuracies = []
all_fold_val_accuracies = []
Calculate_Process = Calculate()
File = Process_File()
Calculate_Tool = [Calculate() for i in range(3)]
Best_Model_Path = None
Best_Validation_Loss = 100000000
# K-Fold loop
kf = KFold(n_splits=5, shuffle=True, random_state=42)
for fold, (train_idx, val_idx) in enumerate(kf.split(range(len(training_dataset)))): # K-Fold 交叉驗證迴圈
Model = self.Construct_Identification_Model_CUDA() # 模型變數
Optimizer = optim.SGD(Model.parameters(), lr=0.045, momentum=0.9, weight_decay = Training_Config["weight_decay"])
print(f"\nStarting Fold {fold + 1}/5")
# Create training and validation subsets for this fold
self.train_subset = torch.utils.data.Subset(training_dataset, train_idx)
self.val_subset = torch.utils.data.Subset(training_dataset, val_idx)
# Wrap subsets in DataLoaders (use same batch size as original)
self.train_loader = self.Dataloader_Sampler(self.train_subset , self.train_batch_size, True)
self.val_loader = self.Dataloader_Sampler(self.val_subset, self.train_batch_size, True)
self.train_loader = Image_Enhance_Training_Data(Training_Loader = self.train_loader, Save_Root = f"{Loading_Config['Image enhance processing save root']}/{str(fold)}")
# 模型訓練與驗證
model_path, Train_Losses, Validation_losses, Train_Accuracies, Validation_accuracies, best_val_loss = self.Training_And_Validation(Model, Optimizer, fold)
# Store fold results
all_fold_train_losses.append(Train_Losses)
all_fold_val_losses.append(Validation_losses)
all_fold_train_accuracies.append(Train_Accuracies)
all_fold_val_accuracies.append(Validation_accuracies)
# 确保张量在CPU上以便可以转换为NumPy数组
if torch.is_tensor(Train_Losses):
Train_Losses = Train_Losses.cpu().detach().numpy()
if torch.is_tensor(Validation_losses):
Validation_losses = Validation_losses.cpu().detach().numpy()
if torch.is_tensor(Train_Accuracies):
Train_Accuracies = Train_Accuracies.cpu().detach().numpy()
if torch.is_tensor(Validation_accuracies):
Validation_accuracies = Validation_accuracies.cpu().detach().numpy()
Losses = [Train_Losses, Validation_losses]
Accuracies = [Train_Accuracies, Validation_accuracies]
plot_history(Losses, Accuracies, f"{Save_Result_File_Config['Identification_Plot_Image']}/{self.Experiment_Name}", f"train-{str(fold)}") # 將訓練結果化成圖,並將化出來的圖丟出去儲存
# 驗證結果
True_Label, Predict_Label, loss, accuracy, precision, recall, f1 = self.Evaluate_Model(Model, Test_Dataloader, fold, model_path)
# 紀錄該次訓練結果
Calculate_Process.Append_numbers(loss, accuracy, precision, recall, f1)
self.record_matrix_image(True_Label, Predict_Label, fold)
print(self.record_everyTime_test_result(loss, accuracy, precision, recall, f1, fold, self.Experiment_Name)) # 紀錄當前訓練完之後的預測結果並輸出成csv檔
# 使用識別模型進行各類別評估
Calculate_Tool = self.Evaluate_Per_Class_Metrics(Model, Test_Dataloader, Loading_Config["Training_Labels"], Calculate_Tool, model_path)
if best_val_loss < Best_Validation_Loss:
Best_Validation_Loss = best_val_loss
Best_Model_Path = model_path
Calculate_Process.Calculate_Mean()
Calculate_Process.Calculate_Std()
File.Save_CSV_File(f"../Result/Experiment_Result/{self.Experiment_Name}/Total/{str(datetime.date.today())}", f"Total_Training_Result-{fold}", Calculate_Process.Output_Style())
for Calculate_Every_Class in Calculate_Tool:
Calculate_Every_Class.Calculate_Mean()
Calculate_Every_Class.Calculate_Std()
# Aggregate results across folds
avg_train_losses = np.mean([losses[-1] for losses in all_fold_train_losses])
avg_val_losses = np.mean([losses[-1] for losses in all_fold_val_losses])
avg_train_accuracies = np.mean([acc[-1] for acc in all_fold_train_accuracies])
avg_val_accuracies = np.mean([acc[-1] for acc in all_fold_val_accuracies])
print(f"\nCross-Validation Results:")
print(f"Avg Train Loss: {avg_train_losses:.4f}, Avg Val Loss: {avg_val_losses:.4f}")
print(f"Avg Train Acc: {avg_train_accuracies:.4f}, Avg Val Acc: {avg_val_accuracies:.4f}")
File.Save_TXT_File(content = f"\nCross-Validation Results:\nAvg Train Loss: {avg_train_losses:.4f}, Avg Val Loss: {avg_val_losses:.4f}\nAvg Train Acc: {avg_train_accuracies:.4f}, Avg Val Acc: {avg_val_accuracies:.4f}\n", Save_Root = Save_Result_File_Config["Identification_Average_Result"], File_Name = "Training_Average_Result")
# 返回最後一個fold的模型路徑和平均指標
return Best_Model_Path
def Training_And_Validation(self, Model, Optimizer, Fold):
'''
模型主要的訓練與驗證部分
'''
model_path, early_stopping, scheduler = call_back(self.Best_Model_Save_Root, f"fold{Fold}", Optimizer)
# Lists to store metrics for this fold
train_losses = []
Validation_losses = []
train_accuracies = []
Validation_accuracies = []
# Epoch loop
for epoch in range(self.Epoch):
Model.train() # Start training
Training_Loss = 0.0
All_Predict_List, All_Label_List = [], []
# Progress bar for training batches
epoch_iterator = tqdm(self.train_loader, desc=f"Fold {Fold + 1}/5, Epoch [{epoch + 1}/{self.Epoch}]")
Start_Time = time.time()
for inputs, labels, File_Name, File_Classes in epoch_iterator:
Total_Losses, Training_Loss, All_Predict_List, All_Label_List, Predict_Indexs, Truth_Indexs = self.Model_Branch(
Model=Model,
Input_Images=inputs,
Labels=labels,
All_Predict_List=All_Predict_List,
All_Label_List=All_Label_List,
running_loss=Training_Loss,
Optimizer=Optimizer,
status="Training"
)
self.Calculate_Progress_And_Timing(inputs, Predict_Indexs, Truth_Indexs, self.train_subset, Total_Losses, epoch_iterator, Start_Time)
train_losses, train_accuracies, Training_Loss, Train_accuracy = self.Calculate_Average_Scores(self.train_loader, Training_Loss, All_Predict_List, All_Label_List, train_losses, train_accuracies)
# Validation step
Model.eval()
val_loss = 0.0
all_val_preds = []
all_val_labels = []
start_Validation_time = time.time()
epoch_iterator = tqdm(self.val_loader, desc=f"\tValidation-Fold {Fold + 1}/5, Epoch [{epoch + 1}/{self.Epoch}]")
with torch.no_grad():
for inputs, labels, File_Name, File_Classes in epoch_iterator:
Validation_Total_Loss, val_loss, all_val_preds, all_val_labels, Predict_Indexs, Truth_Indexs = self.Model_Branch(
Model=Model,
Input_Images=inputs,
Labels=labels,
All_Predict_List=all_val_preds,
All_Label_List=all_val_labels,
running_loss=val_loss,
Optimizer=Optimizer,
status="Validation"
)
self.Calculate_Progress_And_Timing(inputs, Predict_Indexs, Truth_Indexs, self.val_subset, Validation_Total_Loss, epoch_iterator, start_Validation_time)
Validation_losses, Validation_accuracies, val_loss, val_accuracy = self.Calculate_Average_Scores(self.val_loader, val_loss, all_val_preds, all_val_labels, Validation_losses, Validation_accuracies)
print(f"Traini Loss: {Training_Loss:.4f}, Accuracy: {Train_accuracy:0.2f}, Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:0.2f}\n")
if epoch % 5 == 0:
Grad = GradCAM(Model, self.TargetLayer)
Grad.Processing_Main(self.val_loader, f"{Save_Result_File_Config['GradCAM_Validation_Image_Save_Root']}/{self.Experiment_Name}/fold-{str(Fold)}/{str(epoch)}")
# # 創建SaliencyMap實例
# saliency_map = SaliencyMap(self.Model)
# # 處理測試數據集
# saliency_map.Processing_Main(self.val_loader, f"../Result/Saliency_Image/Validation/Saliency_Image({str(datetime.date.today())})/{self.Experiment_Name}/fold-{str(Fold)}/")
# Early stopping
early_stopping(val_loss, Model, model_path)
best_val_loss = early_stopping.best_loss
if early_stopping.early_stop:
print(f"Early stopping triggered in Fold {Fold + 1} at epoch {epoch + 1}")
break
# Learning rate adjustment
scheduler.step(val_loss)
# 確保返回模型路徑
return model_path, train_losses, Validation_losses, train_accuracies, Validation_accuracies, best_val_loss
def Construct_Identification_Model_CUDA(self):
# 从Model_Config中获取输出节点数量
# Model = ModifiedXception()
Model = Xception(num_classes=0)
print(summary(Model))
for name, parameters in Model.named_parameters():
print(f"Layer Name: {name}, Parameters: {parameters.size()}")
# 注释掉summary调用避免Mask参数问题
# 直接打印模型结构
print(f"Model structure: {Model}")
# 打印模型参数和梯度状态
for name, parameters in Model.named_parameters():
print(f"Layer Name: {name}, Parameters: {parameters.size()}, requires_grad: {parameters.requires_grad}")
self.TargetLayer = Model.conv4.pointwise.weight
# self.TargetLayer = Model.base_model.conv4.pointwise
# if name == "exit_flow.conv2.3.pointwise.bias":
# self.TargetLayer = Model.exit_flow.conv2
return self.Convert_Model_To_CUDA(Model)
def Convert_Model_To_CUDA(self, model):
model = nn.DataParallel(model)
model = model.to(self.device)
return model
def Model_Branch(self, Model, Input_Images, Labels, All_Predict_List : list, All_Label_List : list, running_loss, Optimizer, status):
if status == "Training":
Optimizer.zero_grad() # 清零梯度,防止梯度累積
# 將張量移到設備上,但保持梯度計算能力
Input_Images, Labels = Input_Images.to(self.device), Labels.to(self.device)
Predicts_Data = Model(Input_Images)
# 計算損失時使用原始的 Predict 張量和 Labels 張量(保持梯度)
Losses = self.Losses(Predicts_Data, Labels)
if status == "Training":
Losses.backward()
Optimizer.step()
running_loss += Losses.item()
# Collect training predictions and labels (用於評估指標)
Output_Values, Output_Indexs = torch.max(Predicts_Data, dim=1)
True_Indexs = np.argmax(Labels.cpu().numpy(), axis=1)
# # 處理標籤如果是one-hot編碼則轉換為類別索引否則直接使用
# if Labels.dim() > 1 and Labels.size(1) > 1:
# True_Indexs = np.argmax(Labels.cpu().numpy(), axis=1)
# else:
# True_Indexs = Labels.cpu().numpy()
# 將預測索引轉換為 numpy 用於評估指標
All_Predict_List.append(Output_Indexs.cpu().numpy())
All_Label_List.append(True_Indexs)
return Losses, running_loss, All_Predict_List, All_Label_List, Output_Indexs, True_Indexs
def Losses(self, Predicts, Labels):
criterion = Entropy_Loss()
Loss = criterion(Predicts, Labels)
return Loss
def Evaluate_Model(self, cnn_model, Test_Dataloader, index, identification_model_path=None):
# 載入識別模型權重(如果提供了路徑)
if identification_model_path is not None:
cnn_model.load_state_dict(torch.load(identification_model_path))
else:
assert identification_model_path is None, "No identification model path provided for evaluation."
# 評估模型
cnn_model.eval()
True_Label, Predict_Label = [], []
True_Label_OneHot, Predict_Label_OneHot = [], []
loss = 0.0
with torch.no_grad():
for images, labels, File_Name, File_Classes in Test_Dataloader:
Total_Loss, Running_Loss, Predict_Label, True_Label, Output_Indexs, Truth_Index = self.Model_Branch(
Model=cnn_model,
Input_Images=images,
Labels=labels,
All_Predict_List=Predict_Label,
All_Label_List=True_Label,
running_loss=0,
Optimizer=None,
status="Testing"
)
loss /= len(Test_Dataloader)
True_Label_OneHot = torch.as_tensor(True_Label_OneHot, dtype=torch.int)
Predict_Label_OneHot = torch.as_tensor(Predict_Label_OneHot, dtype=torch.float32)
accuracy = accuracy_score(True_Label, Predict_Label)
precision = precision_score(True_Label, Predict_Label, average="macro")
recall = recall_score(True_Label, Predict_Label, average="macro")
f1 = f1_score(True_Label, Predict_Label, average="macro")
# 計算混淆矩陣
matrix = confusion_matrix(True_Label, Predict_Label)
draw_heatmap(matrix, f"{Save_Result_File_Config['Identification_Marix_Image']}/{self.Experiment_Name}/Identification_Test_Marix_Image", f"confusion_matrix", index) # 呼叫畫出confusion matrix的function
Grad = GradCAM(cnn_model, self.TargetLayer)
Grad.Processing_Main(Test_Dataloader, f"{Save_Result_File_Config['GradCAM_Test_Image_Save_Root']}/{self.Experiment_Name}/fold-{str(index)}/")
return True_Label, Predict_Label, loss, accuracy, precision, recall, f1
def Evaluate_Per_Class_Metrics(self, cnn_model, Test_Dataloader, Labels, Calculate_Tool, identification_model_path=None):
"""
Evaluate the model on the test dataloader and compute binary classification metrics for each class.
Parameters:
- cnn_model: The trained model to evaluate.
- Test_Dataloader: DataLoader for the test dataset.
- Labels: List of class names for better readability.
- Calculate_Tool: Tool for recording metrics.
- identification_model_path: Path to the trained model weights (optional).
Returns:
- Calculate_Tool: Updated with binary classification metrics for each class.
"""
# 載入識別模型權重(如果提供了路徑)
if identification_model_path is not None:
cnn_model.load_state_dict(torch.load(identification_model_path))
# 测试GPU是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 设置为评估模式
cnn_model.eval()
all_results = []
all_labels = []
# 使用PyTorch的预测方式
with torch.no_grad(): # 不计算梯度
for inputs, labels, _, _ in Test_Dataloader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = cnn_model(inputs)
_, predicted = torch.max(outputs, 1)
all_results.append(predicted.cpu().numpy())
all_labels.append(np.argmax(labels.cpu().numpy(), axis=1))
# 将所有批次的结果合并为一个数组
Predict = np.concatenate(all_results)
y_test = np.concatenate(all_labels)
print(f"预测结果: {Predict}\n")
# 计算整体评估指标
accuracy = accuracy_score(y_test, Predict)
# 打印整体准确率
print(f"整体准确率 (Accuracy): {accuracy:.4f}")
# 現在有三個類別0, 1, 2
# 為每個類別計算二分類評估指標(將該類別視為正類,其他類別視為負類)
for class_idx in range(3):
print(f"类别 {Labels[class_idx]} 的二分类评估指标:")
y_binary = (y_test == class_idx).astype(int)
predict_binary = (Predict == class_idx).astype(int)
# 计算二分类指标
binary_accuracy = accuracy_score(y_binary, predict_binary)
binary_precision = precision_score(y_binary, predict_binary, zero_division=0)
binary_recall = recall_score(y_binary, predict_binary, zero_division=0)
binary_f1 = f1_score(y_binary, predict_binary, zero_division=0)
# 打印二分类指标
print(f" 准确率 (Accuracy): {binary_accuracy:.4f}")
print(f" 精确率 (Precision): {binary_precision:.4f}")
print(f" 召回率 (Recall): {binary_recall:.4f}")
print(f" F1值: {binary_f1:.4f}\n")
# 记录该类别的指标
Calculate_Tool[class_idx].Append_numbers(0, binary_accuracy, binary_precision, binary_recall, binary_f1)
return Calculate_Tool
def Calculate_Progress_And_Timing(self, inputs, Predict_Labels, Truth_Labels, Subset, loss, epoch_iterator, Start_Time):
# Calculate progress and timing
total_samples = len(Subset)
processed_samples = 0
processed_samples += inputs.size(0) # Use size(0) for batch size
# Calculate progress and timing
elapsed_time = time.time() - Start_Time
iterations_per_second = processed_samples / elapsed_time if elapsed_time > 0 else 0
eta = (total_samples - processed_samples) / iterations_per_second if iterations_per_second > 0 else 0
time_str = f"{int(elapsed_time//60):02d}:{int(elapsed_time%60):02d}<{int(eta//60):02d}:{int(eta%60):02d}"
# Calculate batch metrics using PSNR/SSIM loss
# 检查loss是否为张量如果是则调用item(),否则直接使用浮点数值
batch_loss = loss.item() if torch.is_tensor(loss) else loss
# Calculate batch accuracy
batch_accuracy = (Predict_Labels.cpu().numpy() == Truth_Labels).mean()
# Update progress bar
epoch_iterator.set_postfix_str(
f"{processed_samples}/{total_samples} [{time_str}, {iterations_per_second:.2f}it/s, "
f"acc={batch_accuracy:.3f}, loss={batch_loss:.3f}]"
)
return epoch_iterator
def Calculate_Average_Scores(self, Data_Loader, Running_Losses, All_Predict_Labels, All_Truth_Labels, Losses, Accuracies):
Merge_Function = merge()
All_Predicts = Merge_Function.merge_data_main(All_Predict_Labels, 0, len(All_Predict_Labels))
All_Truths = Merge_Function.merge_data_main(All_Truth_Labels, 0, len(All_Truth_Labels))
Running_Losses /= len(Data_Loader)
Accuracy = accuracy_score(All_Truths, All_Predicts)
Losses.append(Running_Losses)
Accuracies.append(Accuracy)
return Losses, Accuracies, Running_Losses, Accuracy
def record_matrix_image(self, True_Labels, Predict_Labels, index):
'''劃出混淆矩陣(熱力圖)'''
# 計算混淆矩陣
matrix = confusion_matrix(True_Labels, Predict_Labels)
# Confusion_Matrix_of_Two_Classification(matrix, Save_Result_File_Config["Identification_Marix_Image"], Experiment_Name, index) # 呼叫畫出confusion matrix的function
draw_heatmap(matrix, Save_Result_File_Config["Identification_Marix_Image"], self.Experiment_Name, index) # 呼叫畫出confusion matrix的function
def record_everyTime_test_result(self, loss, accuracy, precision, recall, f, indexs, model_name):
'''記錄我單次的訓練結果並將它輸出到檔案中'''
File = Process_File()
Dataframe = pd.DataFrame(
{
"model_name" : str(model_name),
"loss" : "{:.2f}".format(loss),
"precision" : "{:.2f}%".format(precision * 100),
"recall" : "{:.2f}%".format(recall * 100),
"accuracy" : "{:.2f}%".format(accuracy * 100),
"f" : "{:.2f}%".format(f * 100),
}, index = [indexs])
File.Save_CSV_File(Save_Result_File_Config["Identification_Every_Fold_Training_Result"], "train_result", Dataframe)
return Dataframe