from tqdm import tqdm from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score from torchmetrics.functional import auroc from torch.nn import functional from torch import nn import torch from sklearn.model_selection import KFold from torchinfo import summary from sklearn.metrics import confusion_matrix from all_models_tools.all_model_tools import call_back from Model_Loss.Loss import Entropy_Loss # from Model_Loss.binary_cross_entropy import BinaryCrossEntropy # 三分類不需要二分類損失函數 from merge_class.merge import merge from draw_tools.Saliency_Map import SaliencyMap from utils.Stomach_Config import Training_Config, Loading_Config, Save_Result_File_Config # from experiments.Models.Xception_Model_Modification import Xception from experiments.Models.pytorch_Model import ModifiedXception from Load_process.LoadData import Loding_Data_Root from Training_Tools.PreProcess import Training_Precesses from Calculate_Process.Calculate import Calculate from Load_process.file_processing import Process_File from draw_tools.draw import plot_history, draw_heatmap from model_data_processing.processing import Image_Enhance_Training_Data from draw_tools.Grad_cam import GradCAM import time import torch.optim as optim import numpy as np import torch import pandas as pd import datetime import argparse import os class Xception_Identification_Block_Training_Step(Loding_Data_Root, Training_Precesses): def __init__(self, Experiment_Name, Best_Model_Save_Root): self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.Model = self.Construct_Identification_Model_CUDA() # 模型變數 self.train_subset = None # Training Dataset 的子集 self.val_subset = None # Validation Dataset 的子集 self.train_loader = None # Training DataLoader 的讀檔器 self.val_loader = None # Validation DataLoader 的讀檔器 self.Mask = None # 遮罩變數,接收GastroSegNet產出來的Mask self.Grad = None # 梯度變數,後面用來執行Grad CAM self.model_name = Training_Config["Model_Name"] # 取名,使用哪個模型(可能是預處理模型/自己設計的模型) self.Epoch = Training_Config["Epoch"] # 訓練該模型的次數 self.train_batch_size = Training_Config["Train_Batch_Size"] # 訓練模型的Batch Size self.Experiment_Name = Experiment_Name self.Number_Of_Classes = len(Loading_Config["Training_Labels"]) self.Best_Model_Save_Root = Best_Model_Save_Root self.Optimizer = optim.SGD(self.Model.parameters(), lr=0.045, momentum=0.9, weight_decay = Training_Config["weight_decay"]) # 初始化多個繼承物件 Training_Precesses.__init__(self, Training_Config["Image_Size"]) Loding_Data_Root.__init__(self, Loading_Config["Training_Labels"], Loading_Config["Train_Data_Root"], Loading_Config["Test_Data_Root"]) pass ''' 主函數,用來執行模型的訓練與驗證 ''' def Processing_Main(self, training_dataset, Test_Dataloader=None): # Lists to store metrics across all folds all_fold_train_losses = [] all_fold_val_losses = [] all_fold_train_accuracies = [] all_fold_val_accuracies = [] Calculate_Process = Calculate() File = Process_File() Calculate_Tool = [Calculate() for i in range(3)] Best_Model_Path = None Best_Validation_Loss = 100000000 # K-Fold loop kf = KFold(n_splits=5, shuffle=True, random_state=42) for fold, (train_idx, val_idx) in enumerate(kf.split(range(len(training_dataset)))): # K-Fold 交叉驗證迴圈 print(f"\nStarting Fold {fold + 1}/5") # Create training and validation subsets for this fold self.train_subset = torch.utils.data.Subset(training_dataset, train_idx) self.val_subset = torch.utils.data.Subset(training_dataset, val_idx) # Wrap subsets in DataLoaders (use same batch size as original) self.train_loader = self.Dataloader_Sampler(self.train_subset , self.train_batch_size, True) self.val_loader = self.Dataloader_Sampler(self.val_subset, self.train_batch_size, True) self.train_loader = Image_Enhance_Training_Data(Training_Loader = self.train_loader, Save_Root = f"{Loading_Config['Image enhance processing save root']}/{str(fold)}") # 模型訓練與驗證 model_path, Train_Losses, Validation_losses, Train_Accuracies, Validation_accuracies, best_val_loss = self.Training_And_Validation(fold) # Store fold results all_fold_train_losses.append(Train_Losses) all_fold_val_losses.append(Validation_losses) all_fold_train_accuracies.append(Train_Accuracies) all_fold_val_accuracies.append(Validation_accuracies) # 确保张量在CPU上,以便可以转换为NumPy数组 if torch.is_tensor(Train_Losses): Train_Losses = Train_Losses.cpu().detach().numpy() if torch.is_tensor(Validation_losses): Validation_losses = Validation_losses.cpu().detach().numpy() if torch.is_tensor(Train_Accuracies): Train_Accuracies = Train_Accuracies.cpu().detach().numpy() if torch.is_tensor(Validation_accuracies): Validation_accuracies = Validation_accuracies.cpu().detach().numpy() Losses = [Train_Losses, Validation_losses] Accuracies = [Train_Accuracies, Validation_accuracies] plot_history(Losses, Accuracies, f"{Save_Result_File_Config['Identification_Plot_Image']}/{self.Experiment_Name}", f"train-{str(fold)}") # 將訓練結果化成圖,並將化出來的圖丟出去儲存 # 驗證結果 True_Label, Predict_Label, loss, accuracy, precision, recall, f1 = self.Evaluate_Model(self.Model, Test_Dataloader, fold, model_path) # 紀錄該次訓練結果 Calculate_Process.Append_numbers(loss, accuracy, precision, recall, f1) self.record_matrix_image(True_Label, Predict_Label, fold) print(self.record_everyTime_test_result(loss, accuracy, precision, recall, f1, fold, self.Experiment_Name)) # 紀錄當前訓練完之後的預測結果,並輸出成csv檔 # 使用識別模型進行各類別評估 Calculate_Tool = self.Evaluate_Per_Class_Metrics(self.Model, Test_Dataloader, Loading_Config["Training_Labels"], Calculate_Tool, model_path) if best_val_loss < Best_Validation_Loss: Best_Validation_Loss = best_val_loss Best_Model_Path = model_path Calculate_Process.Calculate_Mean() Calculate_Process.Calculate_Std() File.Save_CSV_File(f"../Result/Experiment_Result/{self.Experiment_Name}/Total/{str(datetime.date.today())}", f"Total_Training_Result-{fold}", Calculate_Process.Output_Style()) for Calculate_Every_Class in Calculate_Tool: Calculate_Every_Class.Calculate_Mean() Calculate_Every_Class.Calculate_Std() # Aggregate results across folds avg_train_losses = np.mean([losses[-1] for losses in all_fold_train_losses]) avg_val_losses = np.mean([losses[-1] for losses in all_fold_val_losses]) avg_train_accuracies = np.mean([acc[-1] for acc in all_fold_train_accuracies]) avg_val_accuracies = np.mean([acc[-1] for acc in all_fold_val_accuracies]) print(f"\nCross-Validation Results:") print(f"Avg Train Loss: {avg_train_losses:.4f}, Avg Val Loss: {avg_val_losses:.4f}") print(f"Avg Train Acc: {avg_train_accuracies:.4f}, Avg Val Acc: {avg_val_accuracies:.4f}") File.Save_TXT_File(content = f"\nCross-Validation Results:\nAvg Train Loss: {avg_train_losses:.4f}, Avg Val Loss: {avg_val_losses:.4f}\nAvg Train Acc: {avg_train_accuracies:.4f}, Avg Val Acc: {avg_val_accuracies:.4f}\n", Save_Root = Save_Result_File_Config["Identification_Average_Result"], File_Name = "Training_Average_Result") # 返回最後一個fold的模型路徑和平均指標 return Best_Model_Path def Training_And_Validation(self, Fold): ''' 模型主要的訓練與驗證部分 ''' model_path, early_stopping, scheduler = call_back(self.Best_Model_Save_Root, f"fold{Fold}", self.Optimizer) # Lists to store metrics for this fold train_losses = [] Validation_losses = [] train_accuracies = [] Validation_accuracies = [] # Epoch loop for epoch in range(self.Epoch): self.Model.train() # Start training Training_Loss = 0.0 All_Predict_List, All_Label_List = [], [] # Progress bar for training batches epoch_iterator = tqdm(self.train_loader, desc=f"Fold {Fold + 1}/5, Epoch [{epoch + 1}/{self.Epoch}]") Start_Time = time.time() for inputs, labels, File_Name, File_Classes in epoch_iterator: Total_Losses, Training_Loss, All_Predict_List, All_Label_List, Predict_Indexs, Truth_Indexs = self.Model_Branch( Input_Images=inputs, Labels=labels, All_Predict_List=All_Predict_List, All_Label_List=All_Label_List, running_loss=Training_Loss, status="Training" ) self.Calculate_Progress_And_Timing(inputs, Predict_Indexs, Truth_Indexs, self.train_subset, Total_Losses, epoch_iterator, Start_Time) train_losses, train_accuracies, Training_Loss, Train_accuracy = self.Calculate_Average_Scores(self.train_loader, Training_Loss, All_Predict_List, All_Label_List, train_losses, train_accuracies) # Validation step self.Model.eval() val_loss = 0.0 all_val_preds = [] all_val_labels = [] start_Validation_time = time.time() epoch_iterator = tqdm(self.val_loader, desc=f"\tValidation-Fold {Fold + 1}/5, Epoch [{epoch + 1}/{self.Epoch}]") with torch.no_grad(): for inputs, labels, File_Name, File_Classes in epoch_iterator: Validation_Total_Loss, val_loss, all_val_preds, all_val_labels, Predict_Indexs, Truth_Indexs = self.Model_Branch( Input_Images=inputs, Labels=labels, All_Predict_List=all_val_preds, All_Label_List=all_val_labels, running_loss=val_loss, status="Validation" ) self.Calculate_Progress_And_Timing(inputs, Predict_Indexs, Truth_Indexs, self.val_subset, Validation_Total_Loss, epoch_iterator, start_Validation_time) Validation_losses, Validation_accuracies, val_loss, val_accuracy = self.Calculate_Average_Scores(self.val_loader, val_loss, all_val_preds, all_val_labels, Validation_losses, Validation_accuracies) print(f"Traini Loss: {Training_Loss:.4f}, Accuracy: {Train_accuracy:0.2f}, Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:0.2f}\n") if epoch % 5 == 0: Grad = GradCAM(self.Model, self.TargetLayer) Grad.Processing_Main(self.val_loader, f"{Save_Result_File_Config['GradCAM_Validation_Image_Save_Root']}/{self.Experiment_Name}/fold-{str(Fold)}/{str(epoch)}") # # 創建SaliencyMap實例 # saliency_map = SaliencyMap(self.Model) # # 處理測試數據集 # saliency_map.Processing_Main(self.val_loader, f"../Result/Saliency_Image/Validation/Saliency_Image({str(datetime.date.today())})/{self.Experiment_Name}/fold-{str(Fold)}/") # Early stopping early_stopping(val_loss, self.Model, model_path) best_val_loss = early_stopping.best_loss if early_stopping.early_stop: print(f"Early stopping triggered in Fold {Fold + 1} at epoch {epoch + 1}") break # Learning rate adjustment scheduler.step(val_loss) # 確保返回模型路徑 return model_path, train_losses, Validation_losses, train_accuracies, Validation_accuracies, best_val_loss def Construct_Identification_Model_CUDA(self): # 从Model_Config中获取输出节点数量 Model = ModifiedXception() print(summary(Model)) for name, parameters in Model.named_parameters(): print(f"Layer Name: {name}, Parameters: {parameters.size()}") self.TargetLayer = Model.base_model.conv4.pointwise # 注释掉summary调用,避免Mask参数问题 # 直接打印模型结构 # print(f"Model structure: {Model}") # # 打印模型参数和梯度状态 # for name, parameters in Model.named_parameters(): # print(f"Layer Name: {name}, Parameters: {parameters.size()}, requires_grad: {parameters.requires_grad}") return self.Convert_Model_To_CUDA(Model) def Convert_Model_To_CUDA(self, model): model = nn.DataParallel(model) model = model.to(self.device) return model def Model_Branch(self, Input_Images, Labels, All_Predict_List : list, All_Label_List : list, running_loss, status): if status == "Training": self.Optimizer.zero_grad() # 清零梯度,防止梯度累積 # 將張量移到設備上,但保持梯度計算能力 Input_Images, Labels = Input_Images.to(self.device), Labels.to(self.device) Predicts_Data = self.Model(Input_Images) # 計算損失時使用原始的 Predict 張量和 Labels 張量(保持梯度) Losses = self.Losses(Predicts_Data, Labels) if status == "Training": Losses.backward() self.Optimizer.step() running_loss += Losses.item() # Collect training predictions and labels (用於評估指標) Output_Values, Output_Indexs = torch.max(Predicts_Data, dim=1) True_Indexs = np.argmax(Labels.cpu().numpy(), axis=1) # # 處理標籤:如果是one-hot編碼則轉換為類別索引,否則直接使用 # if Labels.dim() > 1 and Labels.size(1) > 1: # True_Indexs = np.argmax(Labels.cpu().numpy(), axis=1) # else: # True_Indexs = Labels.cpu().numpy() # 將預測索引轉換為 numpy 用於評估指標 All_Predict_List.append(Output_Indexs.cpu().numpy()) All_Label_List.append(True_Indexs) return Losses, running_loss, All_Predict_List, All_Label_List, Output_Indexs, True_Indexs def Losses(self, Predicts, Labels): criterion = Entropy_Loss() Loss = criterion(Predicts, Labels) return Loss def Evaluate_Model(self, cnn_model, Test_Dataloader, index, identification_model_path=None): # 載入識別模型權重(如果提供了路徑) if identification_model_path is not None: cnn_model.load_state_dict(torch.load(identification_model_path)) else: assert identification_model_path is None, "No identification model path provided for evaluation." # 評估模型 cnn_model.eval() True_Label, Predict_Label = [], [] True_Label_OneHot, Predict_Label_OneHot = [], [] loss = 0.0 with torch.no_grad(): for images, labels, File_Name, File_Classes in Test_Dataloader: Total_Loss, Running_Loss, Predict_Label, True_Label, Output_Indexs, Truth_Index = self.Model_Branch( Input_Images=images, Labels=labels, All_Predict_List=Predict_Label, All_Label_List=True_Label, running_loss=0, status="Testing" ) loss /= len(Test_Dataloader) True_Label_OneHot = torch.as_tensor(True_Label_OneHot, dtype=torch.int) Predict_Label_OneHot = torch.as_tensor(Predict_Label_OneHot, dtype=torch.float32) accuracy = accuracy_score(True_Label, Predict_Label) precision = precision_score(True_Label, Predict_Label, average="macro") recall = recall_score(True_Label, Predict_Label, average="macro") f1 = f1_score(True_Label, Predict_Label, average="macro") # 計算混淆矩陣 matrix = confusion_matrix(True_Label, Predict_Label) draw_heatmap(matrix, f"{Save_Result_File_Config['Identification_Marix_Image']}/{self.Experiment_Name}/Identification_Test_Marix_Image", f"confusion_matrix", index) # 呼叫畫出confusion matrix的function TargetLayer = self.Model.base_model.conv4.pointwise Grad = GradCAM(self.Model, TargetLayer) Grad.Processing_Main(Test_Dataloader, f"{Save_Result_File_Config['GradCAM_Test_Image_Save_Root']}/{self.Experiment_Name}/fold-{str(fold)}/") return True_Label, Predict_Label, loss, accuracy, precision, recall, f1 def Evaluate_Per_Class_Metrics(self, cnn_model, Test_Dataloader, Labels, Calculate_Tool, identification_model_path=None): """ Evaluate the model on the test dataloader and compute binary classification metrics for each class. Parameters: - cnn_model: The trained model to evaluate. - Test_Dataloader: DataLoader for the test dataset. - Labels: List of class names for better readability. - Calculate_Tool: Tool for recording metrics. - identification_model_path: Path to the trained model weights (optional). Returns: - Calculate_Tool: Updated with binary classification metrics for each class. """ # 載入識別模型權重(如果提供了路徑) if identification_model_path is not None: cnn_model.load_state_dict(torch.load(identification_model_path)) # 测试GPU是否可用 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"使用设备: {device}") # 设置为评估模式 cnn_model.eval() all_results = [] all_labels = [] # 使用PyTorch的预测方式 with torch.no_grad(): # 不计算梯度 for inputs, labels, _, _ in Test_Dataloader: inputs = inputs.to(device) labels = labels.to(device) outputs = cnn_model(inputs) _, predicted = torch.max(outputs, 1) all_results.append(predicted.cpu().numpy()) all_labels.append(np.argmax(labels.cpu().numpy(), axis=1)) # 将所有批次的结果合并为一个数组 Predict = np.concatenate(all_results) y_test = np.concatenate(all_labels) print(f"预测结果: {Predict}\n") # 计算整体评估指标 accuracy = accuracy_score(y_test, Predict) # 打印整体准确率 print(f"整体准确率 (Accuracy): {accuracy:.4f}") # 現在有三個類別:0, 1, 2 # 為每個類別計算二分類評估指標(將該類別視為正類,其他類別視為負類) for class_idx in range(3): print(f"类别 {Labels[class_idx]} 的二分类评估指标:") y_binary = (y_test == class_idx).astype(int) predict_binary = (Predict == class_idx).astype(int) # 计算二分类指标 binary_accuracy = accuracy_score(y_binary, predict_binary) binary_precision = precision_score(y_binary, predict_binary, zero_division=0) binary_recall = recall_score(y_binary, predict_binary, zero_division=0) binary_f1 = f1_score(y_binary, predict_binary, zero_division=0) # 打印二分类指标 print(f" 准确率 (Accuracy): {binary_accuracy:.4f}") print(f" 精确率 (Precision): {binary_precision:.4f}") print(f" 召回率 (Recall): {binary_recall:.4f}") print(f" F1值: {binary_f1:.4f}\n") # 记录该类别的指标 Calculate_Tool[class_idx].Append_numbers(0, binary_accuracy, binary_precision, binary_recall, binary_f1) return Calculate_Tool def Calculate_Progress_And_Timing(self, inputs, Predict_Labels, Truth_Labels, Subset, loss, epoch_iterator, Start_Time): # Calculate progress and timing total_samples = len(Subset) processed_samples = 0 processed_samples += inputs.size(0) # Use size(0) for batch size # Calculate progress and timing elapsed_time = time.time() - Start_Time iterations_per_second = processed_samples / elapsed_time if elapsed_time > 0 else 0 eta = (total_samples - processed_samples) / iterations_per_second if iterations_per_second > 0 else 0 time_str = f"{int(elapsed_time//60):02d}:{int(elapsed_time%60):02d}<{int(eta//60):02d}:{int(eta%60):02d}" # Calculate batch metrics using PSNR/SSIM loss # 检查loss是否为张量,如果是则调用item(),否则直接使用浮点数值 batch_loss = loss.item() if torch.is_tensor(loss) else loss # Calculate batch accuracy batch_accuracy = (Predict_Labels.cpu().numpy() == Truth_Labels).mean() # Update progress bar epoch_iterator.set_postfix_str( f"{processed_samples}/{total_samples} [{time_str}, {iterations_per_second:.2f}it/s, " f"acc={batch_accuracy:.3f}, loss={batch_loss:.3f}]" ) return epoch_iterator def Calculate_Average_Scores(self, Data_Loader, Running_Losses, All_Predict_Labels, All_Truth_Labels, Losses, Accuracies): Merge_Function = merge() All_Predicts = Merge_Function.merge_data_main(All_Predict_Labels, 0, len(All_Predict_Labels)) All_Truths = Merge_Function.merge_data_main(All_Truth_Labels, 0, len(All_Truth_Labels)) Running_Losses /= len(Data_Loader) Accuracy = accuracy_score(All_Truths, All_Predicts) Losses.append(Running_Losses) Accuracies.append(Accuracy) return Losses, Accuracies, Running_Losses, Accuracy def record_matrix_image(self, True_Labels, Predict_Labels, index): '''劃出混淆矩陣(熱力圖)''' # 計算混淆矩陣 matrix = confusion_matrix(True_Labels, Predict_Labels) # Confusion_Matrix_of_Two_Classification(matrix, Save_Result_File_Config["Identification_Marix_Image"], Experiment_Name, index) # 呼叫畫出confusion matrix的function draw_heatmap(matrix, Save_Result_File_Config["Identification_Marix_Image"], self.Experiment_Name, index) # 呼叫畫出confusion matrix的function def record_everyTime_test_result(self, loss, accuracy, precision, recall, f, indexs, model_name): '''記錄我單次的訓練結果並將它輸出到檔案中''' File = Process_File() Dataframe = pd.DataFrame( { "model_name" : str(model_name), "loss" : "{:.2f}".format(loss), "precision" : "{:.2f}%".format(precision * 100), "recall" : "{:.2f}%".format(recall * 100), "accuracy" : "{:.2f}%".format(accuracy * 100), "f" : "{:.2f}%".format(f * 100), }, index = [indexs]) File.Save_CSV_File(Save_Result_File_Config["Identification_Every_Fold_Training_Result"], "train_result", Dataframe) return Dataframe