Stomach_Cancer_Pytorch/experiments/experiment.py

376 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from Training_Tools.PreProcess import Training_Precesses
from Load_process.Load_Indepentend import Load_Indepentend_Data
from _validation.ValidationTheEnterData import validation_the_enter_data
from utils.Stomach_Config import Training_Config, Loading_Config, Save_Result_File_Config
# from experiments.Training.Identification_Block_Training import Identification_Block_Training_Step
# from experiments.Training.Segmentation_Block_Training import Segmentation_Block_Training_Step
from experiments.Training.Xception_Identification_Test import Xception_Identification_Block_Training_Step
from Load_process.LoadData import Loding_Data_Root
from Load_process.LoadData import Load_Data_Prepare
from model_data_processing.processing import make_label_list
from merge_class.merge import merge
from Training_Tools.Tools import Tool
import torch
import time
import numpy as np
class experiments():
def __init__(self, Xception_Training_Data, Xception_Training_Label, Xception_Training_Mask_Data, status):
'''
# 實驗物件
## 說明:
* 用於開始訓練pytorch的物件裡面分為數個方法負責處理實驗過程的種種
## parmeter:
* cut_image: 呼叫切割影像物件
* merge: 合併的物件
* model_name: 模型名稱,告訴我我是用哪個模型(可能是預處理模型/自己設計的模型)
* experiment_name: 實驗名稱
* epoch: 訓練次數
* train_batch_size: 訓練資料的batch
* convolution_name: Grad-CAM的最後一層的名稱
* Number_Of_Classes: Label的類別
* Status: 選擇現在資料集的狀態
* device: 決定使用GPU或CPU
## Method:
* processing_main: 實驗物件的進入點
* construct_model: 決定實驗用的Model
* Training_Step: 訓練步驟,開始進行訓練驗證的部分
* Evaluate_Model: 驗證模型的準確度
* record_matrix_image: 劃出混淆矩陣(熱力圖)
* record_everyTime_test_result: 記錄我單次的訓練結果並將它輸出到檔案中
'''
self.model_name = Training_Config["Model_Name"] # 取名,告訴我我是用哪個模型(可能是預處理模型/自己設計的模型)
self.epoch = Training_Config["Epoch"]
self.train_batch_size = Training_Config["Train_Batch_Size"]
self.Image_Size = Training_Config["Image_Size"]
self.Xception_Training_Data = Xception_Training_Data
self.Xception_Training_Label = Xception_Training_Label
self.Xception_Training_Mask_Data = Xception_Training_Mask_Data
self.Grad = None
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.validation_obj = validation_the_enter_data() # 呼叫驗證物件
pass
def processing_main(self):
print(f"Testing Data Prepring!!!!")
tool = Tool()
Merge = merge()
Prepare = Load_Data_Prepare()
# 取得One-hot encording 的資料
tool.Set_OneHotEncording(Loading_Config["Training_Labels"])
Encording_Label = tool.Get_OneHot_Encording_Label()
Label_Length = len(Loading_Config["Training_Labels"])
start = time.time()
# self.cut_image.process_main(Loading_Config["Test_Data_Root"], Loading_Config["Annotation_Testing_Root"]) # 呼叫處理test Data與Validation Data的function
tmp_load = Loding_Data_Root(Loading_Config["Training_Labels"], Loading_Config['Test_Data_Root'], None)
Data_Dict_Data = tmp_load.process_main(False)
Total_Size_List = []
Train_Size = 0
print("前處理後資料集總數")
for label in Loading_Config["Training_Labels"]:
Train_Size += len(Data_Dict_Data[label])
Total_Size_List.append(len(Data_Dict_Data[label]))
print(f"Labels: {label}, 總數為: {len(Data_Dict_Data[label])}")
print("總共有 " + str(Train_Size) + " 筆資料")
# 做出跟資料相同數量的Label
Classes = []
i = 0
for encording in Encording_Label:
Classes.append(make_label_list(Total_Size_List[i], encording))
i += 1
# 將資料做成Dict的資料型態
Prepare.Set_Final_Dict_Data(Loading_Config["Training_Labels"], Data_Dict_Data, Classes, Label_Length)
Final_Dict_Data = Prepare.Get_Final_Data_Dict()
keys = list(Final_Dict_Data.keys())
Testing_Data = Merge.merge_all_image_data(Final_Dict_Data[keys[0]], Final_Dict_Data[keys[1]]) # 將訓練資料合併成一個list
for i in range(2, Label_Length):
Testing_Data = Merge.merge_all_image_data(Testing_Data, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list
Testing_Label = Merge.merge_all_image_data(Final_Dict_Data[keys[Label_Length]], Final_Dict_Data[keys[Label_Length + 1]]) #將訓練資料的label合併成一個label的list
for i in range(Label_Length + 2, 2 * Label_Length):
Testing_Label = Merge.merge_all_image_data(Testing_Label, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list
end = time.time()
print("讀取testing與validation資料(154)執行時間:%f\n" % (end - start))
# 將處理好的test Data 與 Validation Data 丟給這個物件的變數
print("Testing Data is Prepared finish!!!!")
PreProcess = Training_Precesses(self.Image_Size)
Test_Dataset = PreProcess.Setting_DataSet(
Testing_Data,
Testing_Label,
None,
"Transform"
)
self.Test_Dataloader = PreProcess.Dataloader_Sampler(Test_Dataset, 1, False)
# 創建Normal vs Others的訓練數據集
Training_Data = PreProcess.Setting_DataSet(
self.Xception_Training_Data,
self.Xception_Training_Label,
None,
"Transform"
)
print("Training is started!!\n")
# 創建訓練步驟物件
identification_Normal_step = Xception_Identification_Block_Training_Step(Training_Config["Three_Classes_Experiment_Name"], Save_Result_File_Config["Three_Classes_Identification_Best_Model"])
# 訓練Normal vs Others分類模型
Best_Model_Path = identification_Normal_step.Processing_Main(Training_Data, self.Test_Dataloader)
# # 分類正常跟其他資料集的測試資料
# Normal_And_Other_Test_Data = self.cut_image.test.copy()
# normal_vs_others_Test_labels = []
# # 將標籤轉換為二分類Normal(1) vs Others(0)
# for label in self.cut_image.test_label:
# if np.argmax(label) == 1: # Normal_Crop
# # Normal類別標籤為[0, 1]
# normal_vs_others_Test_labels.append(np.array([0, 1]))
# else:
# # 其他類別標籤為[1, 0]
# normal_vs_others_Test_labels.append(np.array([1, 0]))
# # 創建Normal vs Others的測試數據集
# normal_vs_others_test_dataset = PreProcess.Setting_DataSet(
# Normal_And_Other_Test_Data,
# normal_vs_others_Test_labels,
# None,
# "Transform"
# )
# normal_vs_others_test_dataloader = PreProcess.Dataloader_Sampler(normal_vs_others_test_dataset, 1, False)
# # =========================================================================================================================================================================================
# # 分類分割模型的測試資料
# # 使用CA資料和Have_Question資料訓練分割模型
# ca_have_question_test_data = []
# ca_have_question_test_labels = []
# # 篩選CA和Have_Question資料
# for i, label in enumerate(self.cut_image.test_label):
# # 檢查是否為CA或Have_Question類別
# if np.argmax(label) == 0 or np.argmax(label) == 2: # stomach_cancer_Crop或Have_Question_Crop
# ca_have_question_test_data.append(self.cut_image.test[i])
# ca_have_question_test_labels.append(self.cut_image.test_label[i])
# print(f"CA and Have_Question Test Data Count: {len(ca_have_question_test_data)}")
# print(f"CA and Have_Question Test Mask Count: {len(self.cut_image.test_mask)}")
# # 創建CA和Have_Question的訓練數據集
# segumentation_test_dataset = PreProcess.Setting_DataSet(
# ca_have_question_test_data,
# ca_have_question_test_labels,
# self.cut_image.test_mask,
# "Transform"
# )
# Segumentation_test_dataloader = PreProcess.Dataloader_Sampler(segumentation_test_dataset, 1, False)
# # =========================================================================================================================================================================================
# # 非胃癌有病與胃癌資料的分類測試資料
# # 準備CA vs Have_Question的訓練數據
# ca_vs_have_question_test_data = []
# ca_vs_have_question_test_labels = []
# # 篩選CA和Have_Question資料
# for i, label in enumerate(self.cut_image.test_label):
# if np.argmax(label) == 0: # stomach_cancer_Crop
# ca_vs_have_question_test_data.append(self.cut_image.test[i])
# ca_vs_have_question_test_labels.append(np.array([1, 0])) # CA類別標籤為[1, 0]
# elif np.argmax(label) == 2: # Have_Question_Crop
# ca_vs_have_question_test_data.append(self.cut_image.test[i])
# ca_vs_have_question_test_labels.append(np.array([0, 1])) # Have_Question類別標籤為[0, 1]
# # 創建CA vs Have_Question的測試數據集
# ca_vs_have_question_test_dataset = PreProcess.Setting_DataSet(
# ca_vs_have_question_test_data,
# ca_vs_have_question_test_labels,
# None,
# "Transform"
# )
# ca_vs_have_question_test_dataloader = PreProcess.Dataloader_Sampler(ca_vs_have_question_test_dataset, 1, False)
# # =========================================================================================================================================================================================
# 建立最終測試資料(不含遮罩)
# Testing_Data, Testing_Label = self.cut_image.test.copy(), self.cut_image.test_label.copy()
# Test_Dataset = PreProcess.Setting_DataSet(
# Testing_Data,
# Testing_Label,
# None,
# "Transform"
# )
# self.Test_Dataloader = PreProcess.Dataloader_Sampler(Test_Dataset, 1, False)
# =========================================================================================================================================================================================
# identification_Normal_step = Identification_Block_Training_Step(Training_Config["Normal_Experiment_Name"], Save_Result_File_Config["Normal_Identification_Best_Model"])
# identification_CA_step = Identification_Block_Training_Step(Training_Config["CA_Experiment_Name"], Save_Result_File_Config["CA_Identification_Best_Model"])
# segmentation_step = Segmentation_Block_Training_Step(Save_Result_File_Config["Segmentation_Best_Model"])
# print("\n=== 第一階段:訓練正常資料分類模型 ===\n")
# # 第一組訓練Normal資料和其他資料的分類模型
# print("\n--- Normal vs Others分類模型 ---\n")
# # 準備Normal vs Others的訓練數據
# # 分類testing的資料
# normal_vs_others_data = self.Xception_Training_Data.copy()
# normal_vs_others_labels = []
# # 將標籤轉換為二分類Normal(1) vs Others(0)
# for label in self.Xception_Training_Label:
# if np.argmax(label) == 1: # Normal_Crop
# # Normal類別標籤為[0, 1]
# normal_vs_others_labels.append(np.array([0, 1]))
# else:
# # 其他類別標籤為[1, 0]
# normal_vs_others_labels.append(np.array([1, 0]))
# # 創建Normal vs Others的訓練數據集
# normal_vs_others_dataset = PreProcess.Setting_DataSet(
# normal_vs_others_data,
# normal_vs_others_labels,
# None,
# "Transform"
# )
# # 訓練Normal vs Others分類模型
# Best_Normal_Model_Path, Normal_Calculate_Process, Normal_Calculate_Tool = identification_Normal_step.Processing_Main(normal_vs_others_dataset, normal_vs_others_test_dataloader)
# # 訓練流程:先訓練分割模型,再訓練分類模型
# print("\n=== 第二階段:訓練分割模型 ===\n")
# # 使用CA資料和Have_Question資料訓練分割模型
# ca_have_question_data = []
# ca_have_question_labels = []
# # 篩選CA和Have_Question資料
# for i, label in enumerate(self.Xception_Training_Label):
# # 檢查是否為CA或Have_Question類別
# if np.argmax(label) == 0 or np.argmax(label) == 2: # stomach_cancer_Crop或Have_Question_Crop
# ca_have_question_data.append(self.Xception_Training_Data[i])
# ca_have_question_labels.append(self.Xception_Training_Label[i])
# # 創建CA和Have_Question的訓練數據集
# ca_have_question_dataset = PreProcess.Setting_DataSet(
# ca_have_question_data,
# ca_have_question_labels,
# self.Xception_Training_Mask_Data,
# "Transform"
# )
# # 執行分割模型訓練,並獲取處理後的圖像
# segmentation_best_model_path, avg_test_loss = segmentation_step.Processing_Main(
# ca_have_question_dataset,
# return_processed_images=True,
# test_dataloader=Segumentation_test_dataloader
# )
# print(f"分割模型訓練完成,模型路徑: {segmentation_best_model_path}")
# # 將處理後的圖像保存起來,用於後續分析或可視化
# # 這裡可以添加保存處理後圖像的代碼例如使用torchvision.utils.save_image
# print("\n=== 第三階段訓練CA資料分類模型 ===\n")
# # 第二組訓練CA資料和Have_Question資料的分類模型
# print("\n--- 訓練CA vs Have_Question分類模型 ---\n")
# Load = Loding_Data_Root(Loading_Config["XML_Loading_Label"], Save_Result_File_Config["Segument_Bounding_Box_Image"], None)
# CA_Laod_Data_Dict = Load.process_main(False)
# Total_Size_List = []
# Train_Size = 0
# print("前處理後資料集總數")
# for label in Loading_Config["XML_Loading_Label"]:
# Train_Size += len(CA_Laod_Data_Dict[label])
# Total_Size_List.append(len(CA_Laod_Data_Dict[label]))
# print(f"Labels: {label}, 總數為: {len(CA_Laod_Data_Dict[label])}")
# print("總共有 " + str(Train_Size) + " 筆資料")
# # 做出跟資料相同數量的Label
# Classes = []
# Encording_Label = np.array([[1, 0], [0, 1]])
# i = 0
# for encording in Encording_Label:
# Classes.append(make_label_list(Total_Size_List[i], encording))
# i += 1
# # 將資料做成Dict的資料型態
# Prepare = Load_Data_Prepare()
# Merge = merge()
# Label_Length = len(Loading_Config["XML_Loading_Label"])
# Prepare.Set_Final_Dict_Data(Loading_Config["XML_Loading_Label"], CA_Laod_Data_Dict, Classes, Label_Length)
# Final_Dict_Data = Prepare.Get_Final_Data_Dict()
# keys = list(Final_Dict_Data.keys())
# Training_Data = Merge.merge_all_image_data(Final_Dict_Data[keys[0]], Final_Dict_Data[keys[1]]) # 將訓練資料合併成一個list
# for i in range(2, Label_Length):
# Training_Data = Merge.merge_all_image_data(Training_Data, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list
# Training_Label = Merge.merge_all_image_data(Final_Dict_Data[keys[Label_Length]], Final_Dict_Data[keys[Label_Length + 1]]) #將訓練資料的label合併成一個label的list
# for i in range(Label_Length + 2, 2 * Label_Length):
# Training_Label = Merge.merge_all_image_data(Training_Label, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list
# # 創建CA vs Have_Question的訓練數據集
# ca_vs_have_question_dataset = PreProcess.Setting_DataSet(
# Training_Data,
# Training_Label,
# None,
# "Transform"
# )
# # 訓練CA vs Have_Question分類模型
# Best_CA_Model_Path, CA_Calculate_Process, CA_Calculate_Tool = identification_CA_step.Processing_Main(ca_vs_have_question_dataset, ca_vs_have_question_test_dataloader)
# # 顯示訓練完成的指標平均值
# print("\n=== Normal and another的指標平均值 ===\n")
# print(f"Normal and another identification result is \n {Normal_Calculate_Process.Output_Style()}\n")
# print("\n=== Normal and another各類別的指標平均值 ===\n")
# for Calculate_Every_Class in Normal_Calculate_Tool:
# print(f"\nNormal and another identification result is \n {Calculate_Every_Class.Output_Style()}\n")
# print("\n\n")
# # 顯示訓練完成的指標平均值
# print("\n=== CA and Have Question的指標平均值 ===\n")
# print(f"CA and Have Question identification result is \n {CA_Calculate_Process.Output_Style()}\n")
# print("\n=== CA and Have Question各類別的指標平均值 ===\n")
# for Calculate_Every_Class in CA_Calculate_Tool:
# print(f"\nCA vs Have_Question identification result is \n {Calculate_Every_Class.Output_Style()}\n")
# print("\n")
# evaluator = ModelEvaluator(Save_Result_File_Config["Normal_Identification_Best_Model"], Save_Result_File_Config["CA_Identification_Best_Model"])
# metrics, results_dir = evaluator.run_evaluation()
# if metrics:
# print(f"\n最終結果摘要:")
# print(f"準確率: {metrics['accuracy']:.4f}")
# print(f"精確率: {metrics['precision_macro']:.4f}")
# print(f"召回率: {metrics['recall_macro']:.4f}")
# print(f"F1分數: {metrics['f1_macro']:.4f}")
# print(f"結果保存位置: {results_dir}")
# return Best_Normal_Model_Path, Best_CA_Model_Path, segmentation_best_model_path