from Training_Tools.PreProcess import Training_Precesses from Load_process.Load_Indepentend import Load_Indepentend_Data from _validation.ValidationTheEnterData import validation_the_enter_data from utils.Stomach_Config import Training_Config, Loading_Config, Save_Result_File_Config # from experiments.Training.Identification_Block_Training import Identification_Block_Training_Step # from experiments.Training.Segmentation_Block_Training import Segmentation_Block_Training_Step from experiments.Training.Xception_Identification_Test import Xception_Identification_Block_Training_Step from Load_process.LoadData import Loding_Data_Root from Load_process.LoadData import Load_Data_Prepare from model_data_processing.processing import make_label_list from merge_class.merge import merge from Training_Tools.Tools import Tool import torch import time import numpy as np class experiments(): def __init__(self, Xception_Training_Data, Xception_Training_Label, Xception_Training_Mask_Data, status): ''' # 實驗物件 ## 說明: * 用於開始訓練pytorch的物件,裡面分為數個方法,負責處理實驗過程的種種 ## parmeter: * cut_image: 呼叫切割影像物件 * merge: 合併的物件 * model_name: 模型名稱,告訴我我是用哪個模型(可能是預處理模型/自己設計的模型) * experiment_name: 實驗名稱 * epoch: 訓練次數 * train_batch_size: 訓練資料的batch * convolution_name: Grad-CAM的最後一層的名稱 * Number_Of_Classes: Label的類別 * Status: 選擇現在資料集的狀態 * device: 決定使用GPU或CPU ## Method: * processing_main: 實驗物件的進入點 * construct_model: 決定實驗用的Model * Training_Step: 訓練步驟,開始進行訓練驗證的部分 * Evaluate_Model: 驗證模型的準確度 * record_matrix_image: 劃出混淆矩陣(熱力圖) * record_everyTime_test_result: 記錄我單次的訓練結果並將它輸出到檔案中 ''' self.model_name = Training_Config["Model_Name"] # 取名,告訴我我是用哪個模型(可能是預處理模型/自己設計的模型) self.epoch = Training_Config["Epoch"] self.train_batch_size = Training_Config["Train_Batch_Size"] self.Image_Size = Training_Config["Image_Size"] self.Xception_Training_Data = Xception_Training_Data self.Xception_Training_Label = Xception_Training_Label self.Xception_Training_Mask_Data = Xception_Training_Mask_Data self.Grad = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.validation_obj = validation_the_enter_data() # 呼叫驗證物件 pass def processing_main(self): print(f"Testing Data Prepring!!!!") tool = Tool() Merge = merge() Prepare = Load_Data_Prepare() # 取得One-hot encording 的資料 tool.Set_OneHotEncording(Loading_Config["Training_Labels"]) Encording_Label = tool.Get_OneHot_Encording_Label() Label_Length = len(Loading_Config["Training_Labels"]) start = time.time() # self.cut_image.process_main(Loading_Config["Test_Data_Root"], Loading_Config["Annotation_Testing_Root"]) # 呼叫處理test Data與Validation Data的function tmp_load = Loding_Data_Root(Loading_Config["Training_Labels"], Loading_Config['Test_Data_Root'], None) Data_Dict_Data = tmp_load.process_main(False) Total_Size_List = [] Train_Size = 0 print("前處理後資料集總數") for label in Loading_Config["Training_Labels"]: Train_Size += len(Data_Dict_Data[label]) Total_Size_List.append(len(Data_Dict_Data[label])) print(f"Labels: {label}, 總數為: {len(Data_Dict_Data[label])}") print("總共有 " + str(Train_Size) + " 筆資料") # 做出跟資料相同數量的Label Classes = [] i = 0 for encording in Encording_Label: Classes.append(make_label_list(Total_Size_List[i], encording)) i += 1 # 將資料做成Dict的資料型態 Prepare.Set_Final_Dict_Data(Loading_Config["Training_Labels"], Data_Dict_Data, Classes, Label_Length) Final_Dict_Data = Prepare.Get_Final_Data_Dict() keys = list(Final_Dict_Data.keys()) Testing_Data = Merge.merge_all_image_data(Final_Dict_Data[keys[0]], Final_Dict_Data[keys[1]]) # 將訓練資料合併成一個list for i in range(2, Label_Length): Testing_Data = Merge.merge_all_image_data(Testing_Data, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list Testing_Label = Merge.merge_all_image_data(Final_Dict_Data[keys[Label_Length]], Final_Dict_Data[keys[Label_Length + 1]]) #將訓練資料的label合併成一個label的list for i in range(Label_Length + 2, 2 * Label_Length): Testing_Label = Merge.merge_all_image_data(Testing_Label, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list end = time.time() print("讀取testing與validation資料(154)執行時間:%f 秒\n" % (end - start)) # 將處理好的test Data 與 Validation Data 丟給這個物件的變數 print("Testing Data is Prepared finish!!!!") PreProcess = Training_Precesses(self.Image_Size) Test_Dataset = PreProcess.Setting_DataSet( Testing_Data, Testing_Label, None, "Transform" ) self.Test_Dataloader = PreProcess.Dataloader_Sampler(Test_Dataset, 1, False) # 創建Normal vs Others的訓練數據集 Training_Data = PreProcess.Setting_DataSet( self.Xception_Training_Data, self.Xception_Training_Label, None, "Transform" ) print("Training is started!!\n") # 創建訓練步驟物件 identification_Normal_step = Xception_Identification_Block_Training_Step(Training_Config["Three_Classes_Experiment_Name"], Save_Result_File_Config["Three_Classes_Identification_Best_Model"]) # 訓練Normal vs Others分類模型 Best_Model_Path = identification_Normal_step.Processing_Main(Training_Data, self.Test_Dataloader) # # 分類正常跟其他資料集的測試資料 # Normal_And_Other_Test_Data = self.cut_image.test.copy() # normal_vs_others_Test_labels = [] # # 將標籤轉換為二分類:Normal(1) vs Others(0) # for label in self.cut_image.test_label: # if np.argmax(label) == 1: # Normal_Crop # # Normal類別標籤為[0, 1] # normal_vs_others_Test_labels.append(np.array([0, 1])) # else: # # 其他類別標籤為[1, 0] # normal_vs_others_Test_labels.append(np.array([1, 0])) # # 創建Normal vs Others的測試數據集 # normal_vs_others_test_dataset = PreProcess.Setting_DataSet( # Normal_And_Other_Test_Data, # normal_vs_others_Test_labels, # None, # "Transform" # ) # normal_vs_others_test_dataloader = PreProcess.Dataloader_Sampler(normal_vs_others_test_dataset, 1, False) # # ========================================================================================================================================================================================= # # 分類分割模型的測試資料 # # 使用CA資料和Have_Question資料訓練分割模型 # ca_have_question_test_data = [] # ca_have_question_test_labels = [] # # 篩選CA和Have_Question資料 # for i, label in enumerate(self.cut_image.test_label): # # 檢查是否為CA或Have_Question類別 # if np.argmax(label) == 0 or np.argmax(label) == 2: # stomach_cancer_Crop或Have_Question_Crop # ca_have_question_test_data.append(self.cut_image.test[i]) # ca_have_question_test_labels.append(self.cut_image.test_label[i]) # print(f"CA and Have_Question Test Data Count: {len(ca_have_question_test_data)}") # print(f"CA and Have_Question Test Mask Count: {len(self.cut_image.test_mask)}") # # 創建CA和Have_Question的訓練數據集 # segumentation_test_dataset = PreProcess.Setting_DataSet( # ca_have_question_test_data, # ca_have_question_test_labels, # self.cut_image.test_mask, # "Transform" # ) # Segumentation_test_dataloader = PreProcess.Dataloader_Sampler(segumentation_test_dataset, 1, False) # # ========================================================================================================================================================================================= # # 非胃癌有病與胃癌資料的分類測試資料 # # 準備CA vs Have_Question的訓練數據 # ca_vs_have_question_test_data = [] # ca_vs_have_question_test_labels = [] # # 篩選CA和Have_Question資料 # for i, label in enumerate(self.cut_image.test_label): # if np.argmax(label) == 0: # stomach_cancer_Crop # ca_vs_have_question_test_data.append(self.cut_image.test[i]) # ca_vs_have_question_test_labels.append(np.array([1, 0])) # CA類別標籤為[1, 0] # elif np.argmax(label) == 2: # Have_Question_Crop # ca_vs_have_question_test_data.append(self.cut_image.test[i]) # ca_vs_have_question_test_labels.append(np.array([0, 1])) # Have_Question類別標籤為[0, 1] # # 創建CA vs Have_Question的測試數據集 # ca_vs_have_question_test_dataset = PreProcess.Setting_DataSet( # ca_vs_have_question_test_data, # ca_vs_have_question_test_labels, # None, # "Transform" # ) # ca_vs_have_question_test_dataloader = PreProcess.Dataloader_Sampler(ca_vs_have_question_test_dataset, 1, False) # # ========================================================================================================================================================================================= # 建立最終測試資料(不含遮罩) # Testing_Data, Testing_Label = self.cut_image.test.copy(), self.cut_image.test_label.copy() # Test_Dataset = PreProcess.Setting_DataSet( # Testing_Data, # Testing_Label, # None, # "Transform" # ) # self.Test_Dataloader = PreProcess.Dataloader_Sampler(Test_Dataset, 1, False) # ========================================================================================================================================================================================= # identification_Normal_step = Identification_Block_Training_Step(Training_Config["Normal_Experiment_Name"], Save_Result_File_Config["Normal_Identification_Best_Model"]) # identification_CA_step = Identification_Block_Training_Step(Training_Config["CA_Experiment_Name"], Save_Result_File_Config["CA_Identification_Best_Model"]) # segmentation_step = Segmentation_Block_Training_Step(Save_Result_File_Config["Segmentation_Best_Model"]) # print("\n=== 第一階段:訓練正常資料分類模型 ===\n") # # 第一組:訓練Normal資料和其他資料的分類模型 # print("\n--- Normal vs Others分類模型 ---\n") # # 準備Normal vs Others的訓練數據 # # 分類testing的資料 # normal_vs_others_data = self.Xception_Training_Data.copy() # normal_vs_others_labels = [] # # 將標籤轉換為二分類:Normal(1) vs Others(0) # for label in self.Xception_Training_Label: # if np.argmax(label) == 1: # Normal_Crop # # Normal類別標籤為[0, 1] # normal_vs_others_labels.append(np.array([0, 1])) # else: # # 其他類別標籤為[1, 0] # normal_vs_others_labels.append(np.array([1, 0])) # # 創建Normal vs Others的訓練數據集 # normal_vs_others_dataset = PreProcess.Setting_DataSet( # normal_vs_others_data, # normal_vs_others_labels, # None, # "Transform" # ) # # 訓練Normal vs Others分類模型 # Best_Normal_Model_Path, Normal_Calculate_Process, Normal_Calculate_Tool = identification_Normal_step.Processing_Main(normal_vs_others_dataset, normal_vs_others_test_dataloader) # # 訓練流程:先訓練分割模型,再訓練分類模型 # print("\n=== 第二階段:訓練分割模型 ===\n") # # 使用CA資料和Have_Question資料訓練分割模型 # ca_have_question_data = [] # ca_have_question_labels = [] # # 篩選CA和Have_Question資料 # for i, label in enumerate(self.Xception_Training_Label): # # 檢查是否為CA或Have_Question類別 # if np.argmax(label) == 0 or np.argmax(label) == 2: # stomach_cancer_Crop或Have_Question_Crop # ca_have_question_data.append(self.Xception_Training_Data[i]) # ca_have_question_labels.append(self.Xception_Training_Label[i]) # # 創建CA和Have_Question的訓練數據集 # ca_have_question_dataset = PreProcess.Setting_DataSet( # ca_have_question_data, # ca_have_question_labels, # self.Xception_Training_Mask_Data, # "Transform" # ) # # 執行分割模型訓練,並獲取處理後的圖像 # segmentation_best_model_path, avg_test_loss = segmentation_step.Processing_Main( # ca_have_question_dataset, # return_processed_images=True, # test_dataloader=Segumentation_test_dataloader # ) # print(f"分割模型訓練完成,模型路徑: {segmentation_best_model_path}") # # 將處理後的圖像保存起來,用於後續分析或可視化 # # 這裡可以添加保存處理後圖像的代碼,例如使用torchvision.utils.save_image # print("\n=== 第三階段:訓練CA資料分類模型 ===\n") # # 第二組:訓練CA資料和Have_Question資料的分類模型 # print("\n--- 訓練CA vs Have_Question分類模型 ---\n") # Load = Loding_Data_Root(Loading_Config["XML_Loading_Label"], Save_Result_File_Config["Segument_Bounding_Box_Image"], None) # CA_Laod_Data_Dict = Load.process_main(False) # Total_Size_List = [] # Train_Size = 0 # print("前處理後資料集總數") # for label in Loading_Config["XML_Loading_Label"]: # Train_Size += len(CA_Laod_Data_Dict[label]) # Total_Size_List.append(len(CA_Laod_Data_Dict[label])) # print(f"Labels: {label}, 總數為: {len(CA_Laod_Data_Dict[label])}") # print("總共有 " + str(Train_Size) + " 筆資料") # # 做出跟資料相同數量的Label # Classes = [] # Encording_Label = np.array([[1, 0], [0, 1]]) # i = 0 # for encording in Encording_Label: # Classes.append(make_label_list(Total_Size_List[i], encording)) # i += 1 # # 將資料做成Dict的資料型態 # Prepare = Load_Data_Prepare() # Merge = merge() # Label_Length = len(Loading_Config["XML_Loading_Label"]) # Prepare.Set_Final_Dict_Data(Loading_Config["XML_Loading_Label"], CA_Laod_Data_Dict, Classes, Label_Length) # Final_Dict_Data = Prepare.Get_Final_Data_Dict() # keys = list(Final_Dict_Data.keys()) # Training_Data = Merge.merge_all_image_data(Final_Dict_Data[keys[0]], Final_Dict_Data[keys[1]]) # 將訓練資料合併成一個list # for i in range(2, Label_Length): # Training_Data = Merge.merge_all_image_data(Training_Data, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list # Training_Label = Merge.merge_all_image_data(Final_Dict_Data[keys[Label_Length]], Final_Dict_Data[keys[Label_Length + 1]]) #將訓練資料的label合併成一個label的list # for i in range(Label_Length + 2, 2 * Label_Length): # Training_Label = Merge.merge_all_image_data(Training_Label, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list # # 創建CA vs Have_Question的訓練數據集 # ca_vs_have_question_dataset = PreProcess.Setting_DataSet( # Training_Data, # Training_Label, # None, # "Transform" # ) # # 訓練CA vs Have_Question分類模型 # Best_CA_Model_Path, CA_Calculate_Process, CA_Calculate_Tool = identification_CA_step.Processing_Main(ca_vs_have_question_dataset, ca_vs_have_question_test_dataloader) # # 顯示訓練完成的指標平均值 # print("\n=== Normal and another的指標平均值 ===\n") # print(f"Normal and another identification result is \n {Normal_Calculate_Process.Output_Style()}\n") # print("\n=== Normal and another各類別的指標平均值 ===\n") # for Calculate_Every_Class in Normal_Calculate_Tool: # print(f"\nNormal and another identification result is \n {Calculate_Every_Class.Output_Style()}\n") # print("\n\n") # # 顯示訓練完成的指標平均值 # print("\n=== CA and Have Question的指標平均值 ===\n") # print(f"CA and Have Question identification result is \n {CA_Calculate_Process.Output_Style()}\n") # print("\n=== CA and Have Question各類別的指標平均值 ===\n") # for Calculate_Every_Class in CA_Calculate_Tool: # print(f"\nCA vs Have_Question identification result is \n {Calculate_Every_Class.Output_Style()}\n") # print("\n") # evaluator = ModelEvaluator(Save_Result_File_Config["Normal_Identification_Best_Model"], Save_Result_File_Config["CA_Identification_Best_Model"]) # metrics, results_dir = evaluator.run_evaluation() # if metrics: # print(f"\n最終結果摘要:") # print(f"準確率: {metrics['accuracy']:.4f}") # print(f"精確率: {metrics['precision_macro']:.4f}") # print(f"召回率: {metrics['recall_macro']:.4f}") # print(f"F1分數: {metrics['f1_macro']:.4f}") # print(f"結果保存位置: {results_dir}") # return Best_Normal_Model_Path, Best_CA_Model_Path, segmentation_best_model_path