from tqdm import tqdm from torch import nn from sklearn.model_selection import KFold from skimage import measure from all_models_tools.all_model_tools import call_back from utils.Stomach_Config import Training_Config, Loading_Config, Save_Result_File_Config from Load_process.LoadData import Loding_Data_Root from Training_Tools.PreProcess import Training_Precesses from ..Models.GastroSegNet_Model import GastroSegNet from Model_Loss.Segmentation_Loss import Segmentation_Loss from draw_tools.draw import plot_history import time import torch.optim as optim import torch import torch.nn.functional as F import numpy as np import cv2 import os class Segmentation_Block_Training_Step(Loding_Data_Root, Training_Precesses): def __init__(self, Best_Model_Save_Root): self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 設置裝置,若有GPU則使用GPU,否則使用CPU self.Model = self.Construct_Segment_Model_CUDA() # 模型變數 self.train_subset = None # Training Dataset 的子集 self.val_subset = None # Validation Dataset 的子集 self.train_loader = None # Training DataLoader 的讀檔器 self.val_loader = None # Validation DataLoader 的讀檔器 self.Mask = None # 遮罩變數,接收RD Net產出來的Mask self.Grad = None # 梯度變數,後面用來執行Grad CAM self.model_name = Training_Config["Mask_Experiment_Name"] # 取名,使用哪個模型(可能是預處理模型/自己設計的模型) self.epoch = Training_Config["Epoch"] # 訓練該模型的次數 self.train_batch_size = Training_Config["Train_Batch_Size"] # 訓練模型的Batch Size self.Experiment_Name = Training_Config["Mask_Experiment_Name"] # 取名,使用哪個模型(可能是預處理模型/自己設計的模型) self.Best_Model_Save_Root = Best_Model_Save_Root # 初始化繼承物件 Training_Precesses.__init__(self, Training_Config["Image_Size"]) Loding_Data_Root.__init__(self, Loading_Config["Training_Labels"], Loading_Config["Train_Data_Root"], Loading_Config["Test_Data_Root"]) def Processing_Main(self, training_dataset, return_processed_images=False, test_dataloader=None): Best_Model_Path = None Best_Validation_Loss = 100000000 # K-Fold loop kf = KFold(n_splits=5, shuffle=True, random_state=42) Training_Data_Lader = self.Dataloader_Sampler(training_dataset, self.train_batch_size, True) for fold, (train_idx, val_idx) in enumerate(kf.split(range(len(training_dataset)))): # K-Fold 交叉驗證迴圈 print(f"\nStarting Fold {fold + 1}/5") # Create training and validation subsets for this fold self.train_subset = torch.utils.data.Subset(training_dataset, train_idx) self.val_subset = torch.utils.data.Subset(training_dataset, val_idx) # Wrap subsets in DataLoaders (use same batch size as original) self.train_loader = self.Dataloader_Sampler(self.train_subset , self.train_batch_size, True) self.val_loader = self.Dataloader_Sampler(self.val_subset, self.train_batch_size, True) # 模型訓練與驗證 model_path, Training_Losses, Validation_Losses, Total_Epoch, best_val_loss = self.Training_And_Validation(fold) Losses = [Training_Losses, Validation_Losses] if best_val_loss < Best_Validation_Loss: Best_Validation_Loss = best_val_loss Best_Model_Path = model_path # 將訓練結果化成圖,並將化出來的圖丟出去儲存 plot_history(Losses, None, f"{Save_Result_File_Config['Segument_Plot_Image']}/{self.Experiment_Name}", f"train-{str(fold)}") # 將訓練結果化成圖,並將化出來的圖丟出去儲存 # 如果需要返回處理後的圖像(用於後續識別訓練) if return_processed_images is not None: # 載入最佳模型 self.Model = self.Construct_Segment_Model_CUDA() self.Model.load_state_dict(torch.load(Best_Model_Path)) self.Model.eval() # 處理測試數據 with torch.no_grad(): for Input_Images, Mask_Ground_Truth_Image, Labels, File_Name, File_Classes in Training_Data_Lader: # 使用Model_Branch處理圖像並獲取分割結果,同時傳遞文件名以保存邊界框圖像 self.Model_Branch(Input_Images, Mask_Ground_Truth_Image, 0.0, Save_Result_File_Config["Segument_Bounding_Box_Image"], return_processed_image=True, file_names=File_Name, Classes=File_Classes) avg_test_loss = self.evaluate_on_test(Best_Model_Path, test_dataloader) return Best_Model_Path, avg_test_loss return Best_Model_Path def Training_And_Validation(self, Fold): self.Model = self.Construct_Segment_Model_CUDA() # 模型初始化 Optimizer = optim.SGD(self.Model.parameters(), lr=0.045, momentum=0.9, weight_decay=0.01) # 優化器初始化 model_path, early_stopping, scheduler = call_back(self.Best_Model_Save_Root, f"fold{Fold}", Optimizer) # 防止過擬合細節函數初始化 epoch = 0 Training_Losses, Validation_Losses = [], [] Training_Running_Losses, Validation_Running_Losses = 0.0, 0.0 # Epoch loop for epoch in range(self.epoch): self.Model.train() # Start training # Progress bar for training batches epoch_iterator = tqdm(self.train_loader, desc=f"Fold {Fold + 1}/5, Epoch [{epoch + 1}/{self.epoch}]") Start_Time = time.time() for Input_Images, Mask_Ground_Truth_Image, Labels, File_Name, File_Classes in epoch_iterator: Optimizer.zero_grad() # 清零梯度,防止梯度累積 Training_Total_Losses, Training_Running_Losses = self.Model_Branch(Input_Images, Mask_Ground_Truth_Image, Training_Running_Losses) Training_Total_Losses.backward() Optimizer.step() epoch_iterator = self.Calculate_Progress_And_Timing(Input_Images, self.train_subset, Training_Total_Losses, epoch_iterator, Start_Time) Training_Losses, Training_Running_Losses = self.Calculate_Average_Scores(self.train_loader, Training_Running_Losses, Training_Losses) # Validation step self.Model.eval() epoch_iterator_Validation = tqdm(self.val_loader, desc=f"\tValidation-Fold {Fold + 1}/5, Epoch [{epoch + 1}/{self.epoch}]") with torch.no_grad(): for Input_Images, Mask_Ground_Truth_Image, Labels, File_Name, File_Classes in epoch_iterator_Validation: Validation_Total_Losses, Validation_Running_Losses = self.Model_Branch(Input_Images, Mask_Ground_Truth_Image, Validation_Running_Losses) # 添加Start_Time參數 Start_Time = time.time() epoch_iterator_Validation = self.Calculate_Progress_And_Timing(Input_Images, self.val_subset, Validation_Total_Losses, epoch_iterator_Validation, Start_Time) Validation_Losses, Validation_Running_Losses = self.Calculate_Average_Scores(self.val_loader, Validation_Running_Losses, Validation_Losses) print(f"Traini Loss: {Training_Running_Losses:.4f}, Validation Loss: {Validation_Running_Losses:.4f}\n") # Early stopping early_stopping(Validation_Running_Losses, self.Model, model_path) if early_stopping.early_stop: print(f"Early stopping triggered in Fold {Fold + 1} at epoch {epoch + 1}") break # Scheduler step scheduler.step(Validation_Running_Losses) Total_Epoch = epoch + 1 best_val_loss = early_stopping.best_loss return model_path, Training_Losses, Validation_Losses, Total_Epoch, best_val_loss def Construct_Segment_Model_CUDA(self): GaSeg = GastroSegNet() # 添加輸出模型摘要的功能 print("\n==== GastroSegNet 模型摘要 ====\n") print(f"輸入通道數: {GaSeg.encoder[0].conv[0].in_channels}") print(f"輸出通道數: {GaSeg.final_conv.out_channels}") # 計算總參數量 total_params = sum(p.numel() for p in GaSeg.parameters() if p.requires_grad) print(f"可訓練參數總量: {total_params:,}") # 顯示模型結構 print("\n模型結構:") print(f"- 編碼器層數: {len(GaSeg.encoder)}") print(f"- 解碼器層數: {len(GaSeg.decoder)}") print("\n特徵通道配置:") features_str = ", ".join([str(GaSeg.encoder[i].conv[0].out_channels) for i in range(len(GaSeg.encoder))]) print(f" - 編碼器特徵通道: {features_str}") print(f" - 瓶頸層特徵通道: {GaSeg.bottleneck.conv[0].out_channels}") print("\n==== 摘要結束 ====\n") return self.Convert_Model_To_CUDA(GaSeg) def Convert_Model_To_CUDA(self, model): if torch.cuda.device_count() > 1: model = nn.DataParallel(model) model = model.to(self.device) return model def Model_Branch(self, Input_Images, Mask_Ground_Truth_Image, running_loss, Save_Dir = None, return_processed_image=False, file_names=None, Classes=None): # 直接將張量移到設備上,不需要重新創建 Input_Images.requires_grad = False Input_Images = Input_Images.to(self.device) Segmentation_Output = self.Model(Input_Images) # 如果需要返回處理後的圖像(用於推理階段) if return_processed_image: # 調整模型產出影像大小 Segmentation_Output = self.Compare_Image_And_Resize_It(Segmentation_Output, Input_Images) # 處理分割輸出,選擇候選框並將框外像素變黑,同時保存邊界框圖像 return self.process_segmentation_output(Input_Images, Segmentation_Output, Save_Dir, save_bbox_images=True, file_names=file_names, Classes=Classes) Mask_Ground_Truth_Image = Mask_Ground_Truth_Image.to(self.device) # 調整模型產出影像大小 Segmentation_Output = self.Compare_Image_And_Resize_It(Segmentation_Output, Mask_Ground_Truth_Image) Losses = self.Losses(Segmentation_Output, Mask_Ground_Truth_Image) # 計算損失(不需要手動設置requires_grad,因為損失計算會自動處理梯度) running_loss += Losses.item() return Losses, running_loss def Compare_Image_And_Resize_It(self, Image, Target): # 調整兩張影像大小到一樣 # 檢查Target的維度 if Target.dim() < 3: # 如果Target是2D張量,將其擴展為4D張量 [batch_size, channels, height, width] Target = Target.unsqueeze(0).unsqueeze(0) elif Target.dim() == 3: # 如果Target是3D張量,將其擴展為4D張量 [batch_size, channels, height, width] Target = Target.unsqueeze(0) # 獲取目標尺寸 target_height = Target.size(-2) # 使用倒數第二維作為高度 target_width = Target.size(-1) # 使用倒數第一維作為寬度 # 調整Image大小 Image = torch.nn.functional.interpolate(Image, size=(target_height, target_width), mode='nearest') # 動態調整通道維度 if Image.size(1) != Target.size(1) and Target.dim() >= 3: conv = torch.nn.Conv2d(Image.size(1), Target.size(1), kernel_size=1).to(self.device) Image = conv(Image) return Image def Losses(self, Segmentation_Output_Image, Segmentation_Mask_GroundTruth_Image): criterion = Segmentation_Loss() Loss = criterion(Segmentation_Output_Image, Segmentation_Mask_GroundTruth_Image) return Loss def Record_Average_Losses(self, DataLoader): loss = 0.0 losses = [] # Calculate average validation loss loss /= len(DataLoader) losses.append(loss) return losses def Calculate_Progress_And_Timing(self, inputs, Subset, loss, epoch_iterator, Start_Time): # Calculate progress and timing total_samples = len(Subset) processed_samples = 0 processed_samples += inputs.size(0) # Use size(0) for batch size # Calculate progress and timing elapsed_time = time.time() - Start_Time iterations_per_second = processed_samples / elapsed_time if elapsed_time > 0 else 0 eta = (total_samples - processed_samples) / iterations_per_second if iterations_per_second > 0 else 0 time_str = f"{int(elapsed_time//60):02d}:{int(elapsed_time%60):02d}<{int(eta//60):02d}:{int(eta%60):02d}" # Calculate batch metrics using PSNR/SSIM loss batch_loss = loss.item() # Update progress bar with PSNR/SSIM loss epoch_iterator.set_postfix_str( f"{processed_samples}/{total_samples} [{time_str}, {iterations_per_second:.2f}it/s, " f"loss={batch_loss:.3f}]" ) return epoch_iterator def Calculate_Average_Scores(self, Data_Loader, Running_Losses, Losses): Running_Losses /= len(Data_Loader) Losses.append(Running_Losses) return Losses, Running_Losses def process_segmentation_output(self, input_images, segmentation_output, bbox_save_dir = None, save_bbox_images=True, file_names=None, Classes = None): """處理分割輸出,選擇候選框並將框外像素變黑,同時保存邊界框圖像 Args: input_images: 原始輸入圖像 [B, C, H, W] segmentation_output: 分割模型輸出 [B, 1, H, W] save_bbox_images: 是否保存邊界框圖像 file_names: 圖像文件名列表,用於保存邊界框圖像 Returns: processed_images: 處理後的圖像,框外像素變黑 [B, 3, H, W] (始終確保輸出是3通道) """ # 將輸出轉換為二值掩碼 (閾值為0.5) Batch_Size_Of_Image = segmentation_output.size(0) binary_masks = (torch.sigmoid(segmentation_output) > 0.5).float() # 創建一個輸出張量,確保始終是3通道 # 獲取批次大小、高度和寬度 Batch_Size_Of_Image, _, height, width = input_images.size() # 創建3通道的輸出張量,不管輸入是幾個通道,輸出都是3通道 processed_images = torch.zeros(Batch_Size_Of_Image, 3, height, width, device=input_images.device) # 對批次中的每張圖像進行處理 for Batch_Size in range(Batch_Size_Of_Image): # 創建保存邊界框圖像的目錄 new_bbox_save_dir = bbox_save_dir new_bbox_save_dir = os.path.join(new_bbox_save_dir, Classes[Batch_Size]) if save_bbox_images and not os.path.exists(new_bbox_save_dir): os.makedirs(new_bbox_save_dir, exist_ok=True) print(f"創建邊界框圖像保存目錄: {new_bbox_save_dir}") # 獲取當前圖像的二值掩碼並轉換為numpy數組 mask = binary_masks[Batch_Size, 0].cpu().numpy().astype(np.uint8) # 使用連通區域分析找出所有候選區域 labeled_mask, num_labels = measure.label(mask, return_num=True, connectivity=2) if num_labels > 0: # 計算每個區域的面積 regions = measure.regionprops(labeled_mask) # 根據面積排序區域(從大到小) regions.sort(key=lambda x: x.area, reverse=True) # 選擇最大的區域作為最終掩碼 if len(regions) > 0: # 創建一個新的掩碼,只包含最大的區域 final_mask = np.zeros_like(mask) for coords in regions[0].coords: final_mask[coords[0], coords[1]] = 1 # 獲取最大區域的邊界框 bbox = regions[0].bbox # (min_row, min_col, max_row, max_col) # 將最終掩碼轉換回PyTorch張量 final_mask_tensor = torch.from_numpy(final_mask).float().to(self.device) # 確保掩碼與輸入圖像的尺寸匹配 if final_mask_tensor.shape != input_images[Batch_Size, 0].shape: # 調整掩碼大小以匹配輸入圖像 final_mask_tensor = torch.nn.functional.interpolate( final_mask_tensor.unsqueeze(0).unsqueeze(0), size=input_images[Batch_Size, 0].shape, mode='nearest' ).squeeze(0).squeeze(0) # 將掩碼應用到原始圖像上(保留框內像素,將框外像素變黑) # 處理不同通道數的輸入圖像 if input_images.size(1) == 1: # 單通道輸入 # 將單通道複製到3個通道 for Channel in range(3): processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0] * final_mask_tensor elif input_images.size(1) == 3: # 三通道輸入 # 直接複製三個通道 for Channel in range(3): processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel] * final_mask_tensor else: # 其他通道數(不太可能,但為了健壯性) # 取前三個通道或複製第一個通道 for Channel in range(3): if Channel < input_images.size(1): processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel] * final_mask_tensor else: processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0] * final_mask_tensor # 保存帶有邊界框的圖像 if save_bbox_images: # 將輸入圖像轉換為numpy數組並調整為適合顯示的格式 img_tensor = input_images[Batch_Size].clone().detach().cpu() img_np = img_tensor.permute(1, 2, 0).numpy() # 將圖像從[0,1]範圍轉換為[0,255]範圍 img_np = (img_np * 255).astype(np.uint8) # 確保圖像是連續的內存塊 img_np = np.ascontiguousarray(img_np) # 如果圖像是單通道的,轉換為三通道 if img_np.shape[2] == 1: img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2BGR) elif img_np.shape[2] == 3: # 確保是BGR格式(OpenCV默認格式) img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) # 繪製邊界框 min_row, min_col, max_row, max_col = bbox cv2.rectangle(img_np, (min_col, min_row), (max_col, max_row), (0, 255, 0), 2) # 生成保存文件名 if file_names is not None and Batch_Size < len(file_names): # 使用提供的文件名 file_name = os.path.basename(file_names[Batch_Size]) save_path = os.path.join(new_bbox_save_dir, f"bbox_{file_name}.png") else: # 使用索引作為文件名 save_path = os.path.join(new_bbox_save_dir, file_names) # 保存圖像 (已經是BGR格式,直接保存) cv2.imwrite(save_path, img_np) print(f"已保存邊界框圖像: {save_path}") else: # 如果沒有找到區域,則保留原始圖像,但確保是3通道 if input_images.size(1) == 1: # 單通道輸入 # 將單通道複製到3個通道 for Channel in range(3): processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0] elif input_images.size(1) == 3: # 三通道輸入 # 直接複製三個通道 processed_images[Batch_Size] = input_images[Batch_Size] else: # 其他通道數 # 取前三個通道或複製第一個通道 for Channel in range(3): if Channel < input_images.size(1): processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel] else: processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0] else: # 如果沒有找到任何區域,則保留原始圖像,但確保是3通道 if input_images.size(1) == 1: # 單通道輸入 # 將單通道複製到3個通道 for Channel in range(3): processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0] elif input_images.size(1) == 3: # 三通道輸入 # 直接複製三個通道 for Channel in range(3): processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel] else: # 其他通道數 # 取前三個通道或複製第一個通道 for Channel in range(3): if Channel < input_images.size(1): processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel] else: processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0] # 在沒有候選區域的情況下也保存處理後的圖像 if save_bbox_images: # 轉換為可保存的numpy格式(與上方保存邊界框圖像一致的流程) img_tensor = processed_images[Batch_Size].clone().detach().cpu() img_np = img_tensor.permute(1, 2, 0).numpy() img_np = (img_np * 255).astype(np.uint8) img_np = np.ascontiguousarray(img_np) # 保證三通道BGR格式 if img_np.shape[2] == 1: img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2BGR) elif img_np.shape[2] == 3: img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) # 生成保存文件名(使用提供的檔名或索引),並標記為無邊界框 if file_names is not None and isinstance(file_names, (list, tuple)) and Batch_Size < len(file_names): file_name = os.path.basename(file_names[Batch_Size]) save_path = os.path.join(new_bbox_save_dir, f"no_bbox_{file_name}.png") else: base_name = file_names if isinstance(file_names, str) and len(file_names) > 0 else f"no_bbox_{Batch_Size}.png" save_path = os.path.join(new_bbox_save_dir, base_name) cv2.imwrite(save_path, img_np) print(f"已保存無邊界框圖像: {save_path}") def evaluate_on_test(self, model_path, test_dataloader): if test_dataloader is None: raise ValueError("Test dataloader is required for evaluation.") self.Model = self.Construct_Segment_Model_CUDA() self.Model.load_state_dict(torch.load(model_path)) self.Model.eval() Test_Losses = [] test_loss = 0.0 with torch.no_grad(): for Input_Images, Mask_Ground_Truth_Image, _, _, _ in test_dataloader: losses, test_loss = self.Model_Branch(Input_Images, Mask_Ground_Truth_Image, test_loss) losses, test_loss = self.Calculate_Average_Scores(test_dataloader, test_loss, Test_Losses) print(f"Average Test Loss: {test_loss}") return test_loss