from Training_Tools.PreProcess import Training_Precesses from Load_process.Load_Indepentend import Load_Indepentend_Data from _validation.ValidationTheEnterData import validation_the_enter_data from Load_process.LoadData import Loding_Data_Root from utils.Stomach_Config import Training_Config, Loading_Config, Save_Result_File_Config from experiments.Training.Identification_Block_Training import Identification_Block_Training_Step from experiments.Training.Segmentation_Block_Training import Segmentation_Block_Training_Step from Load_process.LoadData import Load_Data_Prepare from merge_class.merge import merge from model_data_processing.processing import make_label_list from sklearn.metrics import accuracy_score import numpy as np import torch import torch.nn as nn import time import pandas as pd class experiments(): def __init__(self, Xception_Training_Data, Xception_Training_Label, Xception_Training_Mask_Data, tools, Number_Of_Classes, status): ''' # 實驗物件 ## 說明: * 用於開始訓練pytorch的物件,裡面分為數個方法,負責處理實驗過程的種種 ## parmeter: * Topic_Tool: 讀取訓練、驗證、測試的資料集與Label等等的內容 * cut_image: 呼叫切割影像物件 * merge: 合併的物件 * model_name: 模型名稱,告訴我我是用哪個模型(可能是預處理模型/自己設計的模型) * experiment_name: 實驗名稱 * epoch: 訓練次數 * train_batch_size: 訓練資料的batch * convolution_name: Grad-CAM的最後一層的名稱 * Number_Of_Classes: Label的類別 * Status: 選擇現在資料集的狀態 * device: 決定使用GPU或CPU ## Method: * processing_main: 實驗物件的進入點 * construct_model: 決定實驗用的Model * Training_Step: 訓練步驟,開始進行訓練驗證的部分 * Evaluate_Model: 驗證模型的準確度 * record_matrix_image: 劃出混淆矩陣(熱力圖) * record_everyTime_test_result: 記錄我單次的訓練結果並將它輸出到檔案中 ''' self.model_name = Training_Config["Model_Name"] # 取名,告訴我我是用哪個模型(可能是預處理模型/自己設計的模型) self.epoch = Training_Config["Epoch"] self.train_batch_size = Training_Config["Train_Batch_Size"] self.Image_Size = Training_Config["Image_Size"] self.Number_Of_Classes = Number_Of_Classes self.Xception_Training_Data = Xception_Training_Data self.Xception_Training_Label = Xception_Training_Label self.Xception_Training_Mask_Data = Xception_Training_Mask_Data self.Topic_Tool = tools self.Status = status self.Grad = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.validation_obj = validation_the_enter_data() # 呼叫驗證物件 self.cut_image = Load_Indepentend_Data(self.Topic_Tool.Get_OneHot_Encording_Label()) # 呼叫切割影像物件 pass def processing_main(self): print(f"Testing Data Prepring!!!!") start = time.time() self.cut_image.process_main(Loading_Config["Test_Data_Root"], Loading_Config["Annotation_Testing_Root"]) # 呼叫處理test Data與Validation Data的function end = time.time() print("讀取testing與validation資料(154)執行時間:%f 秒\n" % (end - start)) # 將處理好的test Data 與 Validation Data 丟給這個物件的變數 print("Testing Data is Prepared finish!!!!") PreProcess = Training_Precesses(self.Image_Size) # 分類正常跟其他資料集的測試資料 Normal_And_Other_Test_Data = self.cut_image.test.copy() normal_vs_others_Test_labels = [] # 將標籤轉換為二分類:Normal(1) vs Others(0) for label in self.cut_image.test_label: if np.argmax(label) == 1: # Normal_Crop # Normal類別標籤為[0, 1] normal_vs_others_Test_labels.append(np.array([0, 1])) else: # 其他類別標籤為[1, 0] normal_vs_others_Test_labels.append(np.array([1, 0])) # 創建Normal vs Others的測試數據集 normal_vs_others_test_dataset = PreProcess.Setting_DataSet( Normal_And_Other_Test_Data, normal_vs_others_Test_labels, None, "Transform" ) normal_vs_others_test_dataloader = PreProcess.Dataloader_Sampler(normal_vs_others_test_dataset, 1, False) # ========================================================================================================================================================================================= # 分類分割模型的測試資料 # 使用CA資料和Have_Question資料訓練分割模型 ca_have_question_test_data = [] ca_have_question_test_labels = [] # 篩選CA和Have_Question資料 for i, label in enumerate(self.cut_image.test_label): # 檢查是否為CA或Have_Question類別 if np.argmax(label) == 0 or np.argmax(label) == 2: # stomach_cancer_Crop或Have_Question_Crop ca_have_question_test_data.append(self.cut_image.test[i]) ca_have_question_test_labels.append(self.cut_image.test_label[i]) print(f"CA and Have_Question Test Data Count: {len(ca_have_question_test_data)}") print(f"CA and Have_Question Test Mask Count: {len(self.cut_image.test_mask)}") # 創建CA和Have_Question的訓練數據集 segumentation_test_dataset = PreProcess.Setting_DataSet( ca_have_question_test_data, ca_have_question_test_labels, self.cut_image.test_mask, "Transform" ) Segumentation_test_dataloader = PreProcess.Dataloader_Sampler(segumentation_test_dataset, 1, False) # ========================================================================================================================================================================================= # 非胃癌有病與胃癌資料的分類測試資料 # 準備CA vs Have_Question的訓練數據 ca_vs_have_question_test_data = [] ca_vs_have_question_test_labels = [] # 篩選CA和Have_Question資料 for i, label in enumerate(self.cut_image.test_label): if np.argmax(label) == 0: # stomach_cancer_Crop ca_vs_have_question_test_data.append(self.cut_image.test[i]) ca_vs_have_question_test_labels.append(np.array([1, 0])) # CA類別標籤為[1, 0] elif np.argmax(label) == 2: # Have_Question_Crop ca_vs_have_question_test_data.append(self.cut_image.test[i]) ca_vs_have_question_test_labels.append(np.array([0, 1])) # Have_Question類別標籤為[0, 1] # 創建CA vs Have_Question的測試數據集 ca_vs_have_question_test_dataset = PreProcess.Setting_DataSet( ca_vs_have_question_test_data, ca_vs_have_question_test_labels, None, "Transform" ) ca_vs_have_question_test_dataloader = PreProcess.Dataloader_Sampler(ca_vs_have_question_test_dataset, 1, False) # ========================================================================================================================================================================================= # 建立最終測試資料(不含遮罩) Testing_Data, Testing_Label = self.cut_image.test.copy(), self.cut_image.test_label.copy() Test_Dataset = PreProcess.Setting_DataSet( Testing_Data, Testing_Label, None, "Transform" ) self.Test_Dataloader = PreProcess.Dataloader_Sampler(Test_Dataset, 1, False) # ========================================================================================================================================================================================= print("Training is started!!\n") # 創建訓練步驟物件 identification_Normal_step = Identification_Block_Training_Step(Training_Config["Normal_Experiment_Name"], Save_Result_File_Config["Normal_Identification_Best_Model"]) identification_CA_step = Identification_Block_Training_Step(Training_Config["CA_Experiment_Name"], Save_Result_File_Config["CA_Identification_Best_Model"]) segmentation_step = Segmentation_Block_Training_Step(Save_Result_File_Config["Segmentation_Best_Model"]) print("\n=== 第一階段:訓練正常資料分類模型 ===\n") # 第一組:訓練Normal資料和其他資料的分類模型 print("\n--- Normal vs Others分類模型 ---\n") # 準備Normal vs Others的訓練數據 # 分類testing的資料 normal_vs_others_data = self.Xception_Training_Data.copy() normal_vs_others_labels = [] # 將標籤轉換為二分類:Normal(1) vs Others(0) for label in self.Xception_Training_Label: if np.argmax(label) == 1: # Normal_Crop # Normal類別標籤為[0, 1] normal_vs_others_labels.append(np.array([0, 1])) else: # 其他類別標籤為[1, 0] normal_vs_others_labels.append(np.array([1, 0])) # 創建Normal vs Others的訓練數據集 normal_vs_others_dataset = PreProcess.Setting_DataSet( normal_vs_others_data, normal_vs_others_labels, None, "Transform" ) # 訓練Normal vs Others分類模型 Best_Normal_Model_Path, Normal_Calculate_Process, Normal_Calculate_Tool = identification_Normal_step.Processing_Main(normal_vs_others_dataset, normal_vs_others_test_dataloader) # 訓練流程:先訓練分割模型,再訓練分類模型 print("\n=== 第二階段:訓練分割模型 ===\n") # 使用CA資料和Have_Question資料訓練分割模型 ca_have_question_data = [] ca_have_question_labels = [] # 篩選CA和Have_Question資料 for i, label in enumerate(self.Xception_Training_Label): # 檢查是否為CA或Have_Question類別 if np.argmax(label) == 0 or np.argmax(label) == 2: # stomach_cancer_Crop或Have_Question_Crop ca_have_question_data.append(self.Xception_Training_Data[i]) ca_have_question_labels.append(self.Xception_Training_Label[i]) # 創建CA和Have_Question的訓練數據集 ca_have_question_dataset = PreProcess.Setting_DataSet( ca_have_question_data, ca_have_question_labels, self.Xception_Training_Mask_Data, "Transform" ) # 執行分割模型訓練,並獲取處理後的圖像 segmentation_best_model_path, avg_test_loss = segmentation_step.Processing_Main( ca_have_question_dataset, return_processed_images=True, test_dataloader=Segumentation_test_dataloader ) print(f"分割模型訓練完成,模型路徑: {segmentation_best_model_path}") # 將處理後的圖像保存起來,用於後續分析或可視化 # 這裡可以添加保存處理後圖像的代碼,例如使用torchvision.utils.save_image print("\n=== 第三階段:訓練CA資料分類模型 ===\n") # 第二組:訓練CA資料和Have_Question資料的分類模型 print("\n--- 訓練CA vs Have_Question分類模型 ---\n") Load = Loding_Data_Root(Loading_Config["XML_Loading_Label"], Save_Result_File_Config["Segument_Bounding_Box_Image"], None) CA_Laod_Data_Dict = Load.process_main(False) Total_Size_List = [] Train_Size = 0 print("前處理後資料集總數") for label in Loading_Config["XML_Loading_Label"]: Train_Size += len(CA_Laod_Data_Dict[label]) Total_Size_List.append(len(CA_Laod_Data_Dict[label])) print(f"Labels: {label}, 總數為: {len(CA_Laod_Data_Dict[label])}") print("總共有 " + str(Train_Size) + " 筆資料") # 做出跟資料相同數量的Label Classes = [] Encording_Label = np.array([[1, 0], [0, 1]]) i = 0 for encording in Encording_Label: Classes.append(make_label_list(Total_Size_List[i], encording)) i += 1 # 將資料做成Dict的資料型態 Prepare = Load_Data_Prepare() Merge = merge() Label_Length = len(Loading_Config["XML_Loading_Label"]) Prepare.Set_Final_Dict_Data(Loading_Config["XML_Loading_Label"], CA_Laod_Data_Dict, Classes, Label_Length) Final_Dict_Data = Prepare.Get_Final_Data_Dict() keys = list(Final_Dict_Data.keys()) Training_Data = Merge.merge_all_image_data(Final_Dict_Data[keys[0]], Final_Dict_Data[keys[1]]) # 將訓練資料合併成一個list for i in range(2, Label_Length): Training_Data = Merge.merge_all_image_data(Training_Data, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list Training_Label = Merge.merge_all_image_data(Final_Dict_Data[keys[Label_Length]], Final_Dict_Data[keys[Label_Length + 1]]) #將訓練資料的label合併成一個label的list for i in range(Label_Length + 2, 2 * Label_Length): Training_Label = Merge.merge_all_image_data(Training_Label, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list # 創建CA vs Have_Question的訓練數據集 ca_vs_have_question_dataset = PreProcess.Setting_DataSet( Training_Data, Training_Label, None, "Transform" ) # 訓練CA vs Have_Question分類模型 Best_CA_Model_Path, CA_Calculate_Process, CA_Calculate_Tool = identification_CA_step.Processing_Main(ca_vs_have_question_dataset, ca_vs_have_question_test_dataloader) # 顯示訓練完成的指標平均值 print("\n=== Normal and another的指標平均值 ===\n") print(f"Normal and another identification result is \n {Normal_Calculate_Process.Output_Style()}\n") print("\n=== Normal and another各類別的指標平均值 ===\n") for Calculate_Every_Class in Normal_Calculate_Tool: print(f"\nNormal and another identification result is \n {Calculate_Every_Class.Output_Style()}\n") print("\n\n") # 顯示訓練完成的指標平均值 print("\n=== CA and Have Question的指標平均值 ===\n") print(f"CA and Have Question identification result is \n {CA_Calculate_Process.Output_Style()}\n") print("\n=== CA and Have Question各類別的指標平均值 ===\n") for Calculate_Every_Class in CA_Calculate_Tool: print(f"\nCA vs Have_Question identification result is \n {Calculate_Every_Class.Output_Style()}\n") print("\n") # return Best_Normal_Model_Path, Best_CA_Model_Path, segmentation_best_model_path def test_workflow(self, identification_Normal_step, identification_CA_step, segmentation_step, normal_vs_others_model_path, ca_vs_have_question_model_path, segmentation_model_path): """測試流程: 1. 先用Normal vs Others模型辨識是Normal資料還是其他資料 2. 若辨識為其他資料,或辨識為Normal但F1 score不到50%,進入下一階段 3. 將進入下一階段的資料丟到分割模型產生mask並選擇候選框,將框外像素變黑 4. 再丟到CA vs Have_Question模型中辨識為CA或Have_Question """ print("\n=== 開始測試流程 ===\n") identification_Normal_step = Identification_Block_Training_Step(Training_Config["Normal_Experiment_Name"], Save_Result_File_Config["Normal_Identification_Best_Model"]) identification_CA_step = Identification_Block_Training_Step(Training_Config["CA_Experiment_Name"], Save_Result_File_Config["CA_Identification_Best_Model"]) segmentation_step = Segmentation_Block_Training_Step(Save_Result_File_Config["Segmentation_Best_Model"]) # 準備測試結果記錄 results = [] PreProcess = Training_Precesses(self.Image_Size) # 第一階段:使用Normal vs Others模型進行辨識 print("\n--- 第一階段:Normal vs Others辨識 ---\n") # 載入Normal vs Others模型 identification_Normal_step.Model.load_state_dict(torch.load(normal_vs_others_model_path)) identification_Normal_step.Model.eval() # 記錄需要進入第二階段的樣本 second_stage_samples = [] second_stage_prepare = [] second_save_classes = [] second_load_Classes = [] with torch.no_grad(): for i, (images, labels, file_names, file_classes) in enumerate(self.Test_Dataloader): # 將數據移到設備上 images = images.to(identification_Normal_step.device) # 進行預測 outputs = identification_Normal_step.Model(images) Output_Values, predicted = torch.max(outputs, dim=1) labels = np.argmax(labels.cpu().numpy(), axis=1) # 計算F1 score (這裡簡化為判斷是否需要進入第二階段) # 如果預測為Others(0)或預測為Normal(1)但置信度不高,進入第二階段 if predicted.item() == 0 or outputs[0][1].item() < 0.5: # Others或Normal但置信度低 second_stage_samples.append((images, labels, file_names, file_classes)) second_stage_indices.append(i) print(f"樣本 {file_names[0]} 需要進入第二階段 (預測={predicted.item()}, 置信度={outputs[0][predicted.item()].item():.4f})") second_save_classes.append(file_classes[0]) # Labels = torch.argmax(labels, dim=1) Normal_Accuracy = accuracy_score(labels, predicted.cpu().numpy()) print(f"Normal vs Others 辨識準確率: {Normal_Accuracy:.2f}") second_stage_prepare.append((images, labels, file_names, file_classes)) for Classes in second_save_classes: if Classes not in second_load_Classes: second_load_Classes.append(Classes) # 第二階段:使用分割模型產生mask,再使用CA vs Have_Question模型進行辨識 if second_stage_samples: print(f"\n--- 第二階段:分割模型產生mask並進行CA vs Have_Question辨識 ---\n") print(f"共有 {len(second_stage_samples)} 個樣本進入第二階段") # 載入分割模型 segmentation_step.Model.load_state_dict(torch.load(segmentation_model_path)) segmentation_step.Model.eval() # 載入CA vs Have_Question模型 identification_CA_step.Model.load_state_dict(torch.load(ca_vs_have_question_model_path)) identification_CA_step.Model.eval() with torch.no_grad(): for i, (images, labels, file_names, file_classes) in enumerate(second_stage_samples): # 將數據移到設備上 images = images.to(segmentation_step.device) # 產生mask並處理圖像 segmentation_step.Model_Branch(Input_Images = images, Mask_Ground_Truth_Image = None, running_loss = 0, Save_Dir = Save_Result_File_Config["Segument_Test_Bounding_Box_Image"], return_processed_image=True, file_names=f"bbox_image_{file_names}.png", Classes = second_save_classes) Load = Loding_Data_Root(second_load_Classes, Save_Result_File_Config["Segument_Test_Bounding_Box_Image"], None) CA_Test_Laod_Data_Dict = Load.process_main(False) Total_Size_List = [] Train_Size = 0 print("前處理後資料集總數") for labels in second_load_Classes: Train_Size += len(CA_Test_Laod_Data_Dict[labels]) Total_Size_List.append(len(CA_Test_Laod_Data_Dict[labels])) print(f"Labels: {labels}, 總數為: {len(CA_Test_Laod_Data_Dict[labels])}") print("總共有 " + str(Train_Size) + " 筆資料") # 做出跟資料相同數量的Label Classes = [] Encording_Label = np.array([[1, 0], [0, 1]]) i = 0 for encording in Encording_Label: Classes.append(make_label_list(Total_Size_List[i], encording)) i += 1 # 將資料做成Dict的資料型態 Prepare = Load_Data_Prepare() Merge = merge() Label_Length = len(second_load_Classes) Prepare.Set_Final_Dict_Data(second_load_Classes, CA_Test_Laod_Data_Dict, Classes, Label_Length) Final_Dict_Data = Prepare.Get_Final_Data_Dict() keys = list(Final_Dict_Data.keys()) Testing_Data = Merge.merge_all_image_data(Final_Dict_Data[keys[0]], Final_Dict_Data[keys[1]]) # 將訓練資料合併成一個list for i in range(2, Label_Length): Testing_Data = Merge.merge_all_image_data(Testing_Data, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list Testing_Label = Merge.merge_all_image_data(Final_Dict_Data[keys[Label_Length]], Final_Dict_Data[keys[Label_Length + 1]]) #將訓練資料的label合併成一個label的list for i in range(Label_Length + 2, 2 * Label_Length): Testing_Label = Merge.merge_all_image_data(Testing_Label, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list # 創建CA vs Have_Question的訓練數據集 ca_vs_have_question_test_dataset = PreProcess.Setting_DataSet( Testing_Data, Testing_Label, None, "Transform" ) Evalution_DataLoad = PreProcess.Dataloader_Sampler(ca_vs_have_question_test_dataset, 1, False) Running_Loss = 0.0 Losses, Accuracies = [], [] All_Predict_List, All_Label_List = [], [] # 使用處理後的圖像進行CA vs Have_Question模型辨識 with torch.no_grad(): for i, (inputs, labels, File_Name, File_Classes) in enumerate(Evalution_DataLoad): Evalution_Total_Loss, Running_Loss, All_Predict_List, All_Label_List, Predict_Indexs, Truth_Indexs = identification_CA_step.Model_Branch(inputs, labels, All_Predict_List, All_Label_List, Running_Loss) Losses, Accuracies, val_loss, val_accuracy = identification_CA_step.Calculate_Average_Scores(Evalution_DataLoad, Running_Loss, All_Predict_List, All_Label_List, Losses, Accuracies) print(f"CA vs Have Question 辨識準確率: {Accuracies:0.2f}\n")