Stomach_Cancer_Pytorch/experiments/Training/Segmentation_Block_Training.py

467 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from tqdm import tqdm
from torch import nn
from sklearn.model_selection import KFold
from skimage import measure
from all_models_tools.all_model_tools import call_back
from utils.Stomach_Config import Training_Config, Loading_Config, Save_Result_File_Config
from Load_process.LoadData import Loding_Data_Root
from Training_Tools.PreProcess import Training_Precesses
from ..Models.GastroSegNet_Model import GastroSegNet
from Model_Loss.Segmentation_Loss import Segmentation_Loss
from draw_tools.draw import plot_history
import time
import torch.optim as optim
import torch
import torch.nn.functional as F
import numpy as np
import cv2
import os
class Segmentation_Block_Training_Step(Loding_Data_Root, Training_Precesses):
def __init__(self, Best_Model_Save_Root):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 設置裝置若有GPU則使用GPU否則使用CPU
self.Model = self.Construct_Segment_Model_CUDA() # 模型變數
self.train_subset = None # Training Dataset 的子集
self.val_subset = None # Validation Dataset 的子集
self.train_loader = None # Training DataLoader 的讀檔器
self.val_loader = None # Validation DataLoader 的讀檔器
self.Mask = None # 遮罩變數接收RD Net產出來的Mask
self.Grad = None # 梯度變數後面用來執行Grad CAM
self.model_name = Training_Config["Mask_Experiment_Name"] # 取名,使用哪個模型(可能是預處理模型/自己設計的模型)
self.epoch = Training_Config["Epoch"] # 訓練該模型的次數
self.train_batch_size = Training_Config["Train_Batch_Size"] # 訓練模型的Batch Size
self.Experiment_Name = Training_Config["Mask_Experiment_Name"] # 取名,使用哪個模型(可能是預處理模型/自己設計的模型)
self.Best_Model_Save_Root = Best_Model_Save_Root
# 初始化繼承物件
Training_Precesses.__init__(self, Training_Config["Image_Size"])
Loding_Data_Root.__init__(self, Loading_Config["Training_Labels"], Loading_Config["Train_Data_Root"], Loading_Config["Test_Data_Root"])
def Processing_Main(self, training_dataset, return_processed_images=False, test_dataloader=None):
Best_Model_Path = None
Best_Validation_Loss = 100000000
# K-Fold loop
kf = KFold(n_splits=5, shuffle=True, random_state=42)
Training_Data_Lader = self.Dataloader_Sampler(training_dataset, self.train_batch_size, True)
for fold, (train_idx, val_idx) in enumerate(kf.split(range(len(training_dataset)))): # K-Fold 交叉驗證迴圈
print(f"\nStarting Fold {fold + 1}/5")
# Create training and validation subsets for this fold
self.train_subset = torch.utils.data.Subset(training_dataset, train_idx)
self.val_subset = torch.utils.data.Subset(training_dataset, val_idx)
# Wrap subsets in DataLoaders (use same batch size as original)
self.train_loader = self.Dataloader_Sampler(self.train_subset , self.train_batch_size, True)
self.val_loader = self.Dataloader_Sampler(self.val_subset, self.train_batch_size, True)
# 模型訓練與驗證
model_path, Training_Losses, Validation_Losses, Total_Epoch, best_val_loss = self.Training_And_Validation(fold)
Losses = [Training_Losses, Validation_Losses]
if best_val_loss < Best_Validation_Loss:
Best_Validation_Loss = best_val_loss
Best_Model_Path = model_path
# 將訓練結果化成圖,並將化出來的圖丟出去儲存
plot_history(Losses, None, f"{Save_Result_File_Config['Segument_Plot_Image']}/{self.Experiment_Name}", f"train-{str(fold)}") # 將訓練結果化成圖,並將化出來的圖丟出去儲存
# 如果需要返回處理後的圖像(用於後續識別訓練)
if return_processed_images is not None:
# 載入最佳模型
self.Model = self.Construct_Segment_Model_CUDA()
self.Model.load_state_dict(torch.load(Best_Model_Path))
self.Model.eval()
# 處理測試數據
with torch.no_grad():
for Input_Images, Mask_Ground_Truth_Image, Labels, File_Name, File_Classes in Training_Data_Lader:
# 使用Model_Branch處理圖像並獲取分割結果同時傳遞文件名以保存邊界框圖像
self.Model_Branch(Input_Images, Mask_Ground_Truth_Image, 0.0, Save_Result_File_Config["Segument_Bounding_Box_Image"], return_processed_image=True, file_names=File_Name, Classes=File_Classes)
avg_test_loss = self.evaluate_on_test(Best_Model_Path, test_dataloader)
return Best_Model_Path, avg_test_loss
return Best_Model_Path
def Training_And_Validation(self, Fold):
self.Model = self.Construct_Segment_Model_CUDA() # 模型初始化
Optimizer = optim.SGD(self.Model.parameters(), lr=0.045, momentum=0.9, weight_decay=0.01) # 優化器初始化
model_path, early_stopping, scheduler = call_back(f"{self.Best_Model_Save_Root}/{self.Experiment_Name}", f"fold{Fold}", Optimizer) # 防止過擬合細節函數初始化
epoch = 0
Training_Losses, Validation_Losses = [], []
Training_Running_Losses, Validation_Running_Losses = 0.0, 0.0
# Epoch loop
for epoch in range(self.epoch):
self.Model.train() # Start training
# Progress bar for training batches
epoch_iterator = tqdm(self.train_loader, desc=f"Fold {Fold + 1}/5, Epoch [{epoch + 1}/{self.epoch}]")
Start_Time = time.time()
for Input_Images, Mask_Ground_Truth_Image, Labels, File_Name, File_Classes in epoch_iterator:
Optimizer.zero_grad() # 清零梯度,防止梯度累積
Training_Total_Losses, Training_Running_Losses = self.Model_Branch(Input_Images, Mask_Ground_Truth_Image, Training_Running_Losses)
Training_Total_Losses.backward()
Optimizer.step()
epoch_iterator = self.Calculate_Progress_And_Timing(Input_Images, self.train_subset, Training_Total_Losses, epoch_iterator, Start_Time)
Training_Losses, Training_Running_Losses = self.Calculate_Average_Scores(self.train_loader, Training_Running_Losses, Training_Losses)
# Validation step
self.Model.eval()
epoch_iterator_Validation = tqdm(self.val_loader, desc=f"\tValidation-Fold {Fold + 1}/5, Epoch [{epoch + 1}/{self.epoch}]")
with torch.no_grad():
for Input_Images, Mask_Ground_Truth_Image, Labels, File_Name, File_Classes in epoch_iterator_Validation:
Validation_Total_Losses, Validation_Running_Losses = self.Model_Branch(Input_Images, Mask_Ground_Truth_Image, Validation_Running_Losses)
# 添加Start_Time參數
Start_Time = time.time()
epoch_iterator_Validation = self.Calculate_Progress_And_Timing(Input_Images, self.val_subset, Validation_Total_Losses, epoch_iterator_Validation, Start_Time)
Validation_Losses, Validation_Running_Losses = self.Calculate_Average_Scores(self.val_loader, Validation_Running_Losses, Validation_Losses)
print(f"Traini Loss: {Training_Running_Losses:.4f}, Validation Loss: {Validation_Running_Losses:.4f}\n")
# Early stopping
early_stopping(Validation_Running_Losses, self.Model, model_path)
if early_stopping.early_stop:
print(f"Early stopping triggered in Fold {Fold + 1} at epoch {epoch + 1}")
break
# Scheduler step
scheduler.step(Validation_Running_Losses)
Total_Epoch = epoch + 1
best_val_loss = early_stopping.best_loss
return model_path, Training_Losses, Validation_Losses, Total_Epoch, best_val_loss
def Construct_Segment_Model_CUDA(self):
GaSeg = GastroSegNet()
# 添加輸出模型摘要的功能
print("\n==== GastroSegNet 模型摘要 ====\n")
print(f"輸入通道數: {GaSeg.encoder[0].conv[0].in_channels}")
print(f"輸出通道數: {GaSeg.final_conv.out_channels}")
# 計算總參數量
total_params = sum(p.numel() for p in GaSeg.parameters() if p.requires_grad)
print(f"可訓練參數總量: {total_params:,}")
# 顯示模型結構
print("\n模型結構:")
print(f"- 編碼器層數: {len(GaSeg.encoder)}")
print(f"- 解碼器層數: {len(GaSeg.decoder)}")
print("\n特徵通道配置:")
features_str = ", ".join([str(GaSeg.encoder[i].conv[0].out_channels) for i in range(len(GaSeg.encoder))])
print(f" - 編碼器特徵通道: {features_str}")
print(f" - 瓶頸層特徵通道: {GaSeg.bottleneck.conv[0].out_channels}")
print("\n==== 摘要結束 ====\n")
return self.Convert_Model_To_CUDA(GaSeg)
def Convert_Model_To_CUDA(self, model):
if torch.cuda.device_count() > 1:
model = nn.DataParallel(model)
model = model.to(self.device)
return model
def Model_Branch(self, Input_Images, Mask_Ground_Truth_Image, running_loss, Save_Dir = None, return_processed_image=False, file_names=None, Classes=None):
# 直接將張量移到設備上,不需要重新創建
Input_Images.requires_grad = False
Input_Images = Input_Images.to(self.device)
Segmentation_Output = self.Model(Input_Images)
# 如果需要返回處理後的圖像(用於推理階段)
if return_processed_image:
# 調整模型產出影像大小
Segmentation_Output = self.Compare_Image_And_Resize_It(Segmentation_Output, Input_Images)
# 處理分割輸出,選擇候選框並將框外像素變黑,同時保存邊界框圖像
return self.process_segmentation_output(Input_Images, Segmentation_Output, Save_Dir, save_bbox_images=True, file_names=file_names, Classes=Classes)
Mask_Ground_Truth_Image = Mask_Ground_Truth_Image.to(self.device)
# 調整模型產出影像大小
Segmentation_Output = self.Compare_Image_And_Resize_It(Segmentation_Output, Mask_Ground_Truth_Image)
Losses = self.Losses(Segmentation_Output, Mask_Ground_Truth_Image)
# 計算損失不需要手動設置requires_grad因為損失計算會自動處理梯度
running_loss += Losses.item()
return Losses, running_loss
def Compare_Image_And_Resize_It(self, Image, Target): # 調整兩張影像大小到一樣
# 檢查Target的維度
if Target.dim() < 3:
# 如果Target是2D張量將其擴展為4D張量 [batch_size, channels, height, width]
Target = Target.unsqueeze(0).unsqueeze(0)
elif Target.dim() == 3:
# 如果Target是3D張量將其擴展為4D張量 [batch_size, channels, height, width]
Target = Target.unsqueeze(0)
# 獲取目標尺寸
target_height = Target.size(-2) # 使用倒數第二維作為高度
target_width = Target.size(-1) # 使用倒數第一維作為寬度
# 調整Image大小
Image = torch.nn.functional.interpolate(Image, size=(target_height, target_width), mode='nearest')
# 動態調整通道維度
if Image.size(1) != Target.size(1) and Target.dim() >= 3:
conv = torch.nn.Conv2d(Image.size(1), Target.size(1), kernel_size=1).to(self.device)
Image = conv(Image)
return Image
def Losses(self, Segmentation_Output_Image, Segmentation_Mask_GroundTruth_Image):
criterion = Segmentation_Loss()
Loss = criterion(Segmentation_Output_Image, Segmentation_Mask_GroundTruth_Image)
return Loss
def Record_Average_Losses(self, DataLoader):
loss = 0.0
losses = []
# Calculate average validation loss
loss /= len(DataLoader)
losses.append(loss)
return losses
def Calculate_Progress_And_Timing(self, inputs, Subset, loss, epoch_iterator, Start_Time):
# Calculate progress and timing
total_samples = len(Subset)
processed_samples = 0
processed_samples += inputs.size(0) # Use size(0) for batch size
# Calculate progress and timing
elapsed_time = time.time() - Start_Time
iterations_per_second = processed_samples / elapsed_time if elapsed_time > 0 else 0
eta = (total_samples - processed_samples) / iterations_per_second if iterations_per_second > 0 else 0
time_str = f"{int(elapsed_time//60):02d}:{int(elapsed_time%60):02d}<{int(eta//60):02d}:{int(eta%60):02d}"
# Calculate batch metrics using PSNR/SSIM loss
batch_loss = loss.item()
# Update progress bar with PSNR/SSIM loss
epoch_iterator.set_postfix_str(
f"{processed_samples}/{total_samples} [{time_str}, {iterations_per_second:.2f}it/s, "
f"loss={batch_loss:.3f}]"
)
return epoch_iterator
def Calculate_Average_Scores(self, Data_Loader, Running_Losses, Losses):
Running_Losses /= len(Data_Loader)
Losses.append(Running_Losses)
return Losses, Running_Losses
def process_segmentation_output(self, input_images, segmentation_output, bbox_save_dir = None, save_bbox_images=True, file_names=None, Classes = None):
"""處理分割輸出,選擇候選框並將框外像素變黑,同時保存邊界框圖像
Args:
input_images: 原始輸入圖像 [B, C, H, W]
segmentation_output: 分割模型輸出 [B, 1, H, W]
save_bbox_images: 是否保存邊界框圖像
file_names: 圖像文件名列表,用於保存邊界框圖像
Returns:
processed_images: 處理後的圖像,框外像素變黑 [B, 3, H, W] (始終確保輸出是3通道)
"""
# 將輸出轉換為二值掩碼 (閾值為0.5)
Batch_Size_Of_Image = segmentation_output.size(0)
binary_masks = (torch.sigmoid(segmentation_output) > 0.5).float()
# 創建一個輸出張量確保始終是3通道
# 獲取批次大小、高度和寬度
Batch_Size_Of_Image, _, height, width = input_images.size()
# 創建3通道的輸出張量不管輸入是幾個通道輸出都是3通道
processed_images = torch.zeros(Batch_Size_Of_Image, 3, height, width, device=input_images.device)
# 對批次中的每張圖像進行處理
for Batch_Size in range(Batch_Size_Of_Image):
# 創建保存邊界框圖像的目錄
new_bbox_save_dir = bbox_save_dir
new_bbox_save_dir = os.path.join(new_bbox_save_dir, Classes[Batch_Size])
if save_bbox_images and not os.path.exists(new_bbox_save_dir):
os.makedirs(new_bbox_save_dir, exist_ok=True)
print(f"創建邊界框圖像保存目錄: {new_bbox_save_dir}")
# 獲取當前圖像的二值掩碼並轉換為numpy數組
mask = binary_masks[Batch_Size, 0].cpu().numpy().astype(np.uint8)
# 使用連通區域分析找出所有候選區域
labeled_mask, num_labels = measure.label(mask, return_num=True, connectivity=2)
if num_labels > 0:
# 計算每個區域的面積
regions = measure.regionprops(labeled_mask)
# 根據面積排序區域(從大到小)
regions.sort(key=lambda x: x.area, reverse=True)
# 選擇最大的區域作為最終掩碼
if len(regions) > 0:
# 創建一個新的掩碼,只包含最大的區域
final_mask = np.zeros_like(mask)
for coords in regions[0].coords:
final_mask[coords[0], coords[1]] = 1
# 獲取最大區域的邊界框
bbox = regions[0].bbox # (min_row, min_col, max_row, max_col)
# 將最終掩碼轉換回PyTorch張量
final_mask_tensor = torch.from_numpy(final_mask).float().to(self.device)
# 確保掩碼與輸入圖像的尺寸匹配
if final_mask_tensor.shape != input_images[Batch_Size, 0].shape:
# 調整掩碼大小以匹配輸入圖像
final_mask_tensor = torch.nn.functional.interpolate(
final_mask_tensor.unsqueeze(0).unsqueeze(0),
size=input_images[Batch_Size, 0].shape,
mode='nearest'
).squeeze(0).squeeze(0)
# 將掩碼應用到原始圖像上(保留框內像素,將框外像素變黑)
# 處理不同通道數的輸入圖像
if input_images.size(1) == 1: # 單通道輸入
# 將單通道複製到3個通道
for Channel in range(3):
processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0] * final_mask_tensor
elif input_images.size(1) == 3: # 三通道輸入
# 直接複製三個通道
for Channel in range(3):
processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel] * final_mask_tensor
else: # 其他通道數(不太可能,但為了健壯性)
# 取前三個通道或複製第一個通道
for Channel in range(3):
if Channel < input_images.size(1):
processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel] * final_mask_tensor
else:
processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0] * final_mask_tensor
# 保存帶有邊界框的圖像
if save_bbox_images:
# 將輸入圖像轉換為numpy數組並調整為適合顯示的格式
img_tensor = input_images[Batch_Size].clone().detach().cpu()
img_np = img_tensor.permute(1, 2, 0).numpy()
# 將圖像從[0,1]範圍轉換為[0,255]範圍
img_np = (img_np * 255).astype(np.uint8)
# 確保圖像是連續的內存塊
img_np = np.ascontiguousarray(img_np)
# 如果圖像是單通道的,轉換為三通道
if img_np.shape[2] == 1:
img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2BGR)
elif img_np.shape[2] == 3:
# 確保是BGR格式OpenCV默認格式
img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)
# 繪製邊界框
min_row, min_col, max_row, max_col = bbox
cv2.rectangle(img_np, (min_col, min_row), (max_col, max_row), (0, 255, 0), 2)
# 生成保存文件名
if file_names is not None and Batch_Size < len(file_names):
# 使用提供的文件名
file_name = os.path.basename(file_names[Batch_Size])
save_path = os.path.join(new_bbox_save_dir, f"bbox_{file_name}.png")
else:
# 使用索引作為文件名
save_path = os.path.join(new_bbox_save_dir, file_names)
# 保存圖像 (已經是BGR格式直接保存)
cv2.imwrite(save_path, img_np)
print(f"已保存邊界框圖像: {save_path}")
else:
# 如果沒有找到區域則保留原始圖像但確保是3通道
if input_images.size(1) == 1: # 單通道輸入
# 將單通道複製到3個通道
for Channel in range(3):
processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0]
elif input_images.size(1) == 3: # 三通道輸入
# 直接複製三個通道
processed_images[Batch_Size] = input_images[Batch_Size]
else: # 其他通道數
# 取前三個通道或複製第一個通道
for Channel in range(3):
if Channel < input_images.size(1):
processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel]
else:
processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0]
else:
# 如果沒有找到任何區域則保留原始圖像但確保是3通道
if input_images.size(1) == 1: # 單通道輸入
# 將單通道複製到3個通道
for Channel in range(3):
processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0]
elif input_images.size(1) == 3: # 三通道輸入
# 直接複製三個通道
for Channel in range(3):
processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel]
else: # 其他通道數
# 取前三個通道或複製第一個通道
for Channel in range(3):
if Channel < input_images.size(1):
processed_images[Batch_Size, Channel] = input_images[Batch_Size, Channel]
else:
processed_images[Batch_Size, Channel] = input_images[Batch_Size, 0]
# 在沒有候選區域的情況下也保存處理後的圖像
if save_bbox_images:
# 轉換為可保存的numpy格式與上方保存邊界框圖像一致的流程
img_tensor = processed_images[Batch_Size].clone().detach().cpu()
img_np = img_tensor.permute(1, 2, 0).numpy()
img_np = (img_np * 255).astype(np.uint8)
img_np = np.ascontiguousarray(img_np)
# 保證三通道BGR格式
if img_np.shape[2] == 1:
img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2BGR)
elif img_np.shape[2] == 3:
img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)
# 生成保存文件名(使用提供的檔名或索引),並標記為無邊界框
if file_names is not None and isinstance(file_names, (list, tuple)) and Batch_Size < len(file_names):
file_name = os.path.basename(file_names[Batch_Size])
save_path = os.path.join(new_bbox_save_dir, f"no_bbox_{file_name}.png")
else:
base_name = file_names if isinstance(file_names, str) and len(file_names) > 0 else f"no_bbox_{Batch_Size}.png"
save_path = os.path.join(new_bbox_save_dir, base_name)
cv2.imwrite(save_path, img_np)
print(f"已保存無邊界框圖像: {save_path}")
def evaluate_on_test(self, model_path, test_dataloader):
if test_dataloader is None:
raise ValueError("Test dataloader is required for evaluation.")
self.Model = self.Construct_Segment_Model_CUDA()
self.Model.load_state_dict(torch.load(model_path))
self.Model.eval()
Test_Losses = []
test_loss = 0.0
with torch.no_grad():
for Input_Images, Mask_Ground_Truth_Image, _, _, _ in test_dataloader:
losses, test_loss = self.Model_Branch(Input_Images, Mask_Ground_Truth_Image, test_loss)
losses, test_loss = self.Calculate_Average_Scores(test_dataloader, test_loss, Test_Losses)
print(f"Average Test Loss: {test_loss}")
return test_loss