Files
Stomach_Cancer_Pytorch/experiments/Model_All_Step.py

267 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torchmetrics.functional import auroc
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from all_models_tools.all_model_tools import call_back
from Model_Loss.Loss import Entropy_Loss
from merge_class.merge import merge
from Training_Tools.PreProcess import ListDataset
from Load_process.file_processing import Process_File
from draw_tools.draw import plot_history, draw_heatmap
from Load_process.file_processing import Process_File
import time
import torch.optim as optim
import numpy as np
import torch
import pandas as pd
class All_Step:
def __init__(self, PreProcess_Classes_Data, Batch, Model, Epoch, Number_Of_Classes, Model_Name, Experiment_Name):
self.PreProcess_Classes_Data = PreProcess_Classes_Data
self.Training_DataLoader, self.Test_Dataloader = self.PreProcess_Classes_Data.Total_Data_Combine_To_DataLoader(Batch)
self.Model = Model
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.Epoch = Epoch
self.Number_Of_Classes = Number_Of_Classes
self.Model_Name = Model_Name
self.Experiment_Name = Experiment_Name
def Training_Step(self, model_name, counter):
# Lists to store metrics across all folds
all_fold_train_losses = []
all_fold_val_losses = []
all_fold_train_accuracies = []
all_fold_val_accuracies = []
# Define K-fold cross-validator
K_Fold = KFold(n_splits=5, shuffle=True, random_state=42)
File = Process_File()
# Get the underlying dataset from PreProcess_Classes_Data
training_dataset = ListDataset(data_list = self.PreProcess_Classes_Data.Training_Datas, labels_list = self.PreProcess_Classes_Data.Training_Labels, status = True)
# K-Fold loop
for fold, (train_idx, val_idx) in enumerate(K_Fold.split(training_dataset)):
print(f"\nStarting Fold {fold + 1}/5")
# Create training and validation subsets for this fold
train_subset = torch.utils.data.Subset(training_dataset, train_idx)
val_subset = torch.utils.data.Subset(training_dataset, val_idx)
# Wrap subsets in DataLoaders (use same batch size as original)
batch_size = self.Training_DataLoader.batch_size
train_loader = torch.utils.data.DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_subset, batch_size=batch_size, shuffle=False)
# Reinitialize model and optimizer for each fold
self.Model = self.Model.__class__(self.Number_Of_Classes).to(self.device) # Reinitialize model
Optimizer = optim.SGD(self.Model.parameters(), lr=0.045, momentum=0.9, weight_decay=0.1)
model_path, early_stopping, scheduler = call_back(model_name, str(counter) + f"_fold{fold}", Optimizer)
criterion = Entropy_Loss() # Custom loss function
Merge_Function = merge()
# Lists to store metrics for this fold
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []
# Epoch loop
for epoch in range(self.Epoch):
self.Model.train() # Start training
running_loss = 0.0
all_train_preds = []
all_train_labels = []
processed_samples = 0
# Calculate epoch start time
start_time = time.time()
total_samples = len(train_subset) # Total samples in subset, not DataLoader
# Progress bar for training batches
epoch_iterator = tqdm(train_loader, desc=f"Fold {fold + 1}/5, Epoch [{epoch + 1}/{self.Epoch}]")
for inputs, labels in epoch_iterator:
inputs, labels = inputs.to(self.device), labels.to(self.device) # Already tensors from DataLoader
Optimizer.zero_grad()
outputs = self.Model(inputs)
loss = criterion(outputs, labels)
loss.backward()
Optimizer.step()
running_loss += loss.item()
# Collect training predictions and labels
Output_Values, Output_Indexs = torch.max(outputs, dim=1)
True_Indexs = np.argmax(labels.cpu().numpy(), axis=1)
all_train_preds.append(Output_Indexs.cpu().numpy())
all_train_labels.append(True_Indexs)
processed_samples += inputs.size(0) # Use size(0) for batch size
# Calculate progress and timing
progress = (processed_samples / total_samples) * 100
elapsed_time = time.time() - start_time
iterations_per_second = processed_samples / elapsed_time if elapsed_time > 0 else 0
eta = (total_samples - processed_samples) / iterations_per_second if iterations_per_second > 0 else 0
time_str = f"{int(elapsed_time//60):02d}:{int(elapsed_time%60):02d}<{int(eta//60):02d}:{int(eta%60):02d}"
# Calculate batch accuracy
batch_accuracy = (Output_Indexs.cpu().numpy() == True_Indexs).mean()
# Update progress bar
epoch_iterator.set_postfix_str(
f"{processed_samples}/{total_samples} [{time_str}, {iterations_per_second:.2f}it/s, "
f"acc={batch_accuracy:.3f}, loss={loss.item():.3f}]"
)
epoch_iterator.close()
# Merge predictions and labels
all_train_preds = Merge_Function.merge_data_main(all_train_preds, 0, len(all_train_preds))
all_train_labels = Merge_Function.merge_data_main(all_train_labels, 0, len(all_train_labels))
Training_Loss = running_loss / len(train_loader)
train_accuracy = accuracy_score(all_train_labels, all_train_preds)
train_losses.append(Training_Loss)
train_accuracies.append(train_accuracy)
# Validation step
self.Model.eval()
val_loss = 0.0
all_val_preds = []
all_val_labels = []
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(self.device), labels.to(self.device)
outputs = self.Model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item()
# Collect validation predictions and labels
Output_Values, Output_Indexs = torch.max(outputs, dim=1)
True_Indexs = np.argmax(labels.cpu().numpy(), axis=1)
all_val_preds.append(Output_Indexs.cpu().numpy())
all_val_labels.append(True_Indexs)
# Merge predictions and labels
all_val_preds = Merge_Function.merge_data_main(all_val_preds, 0, len(all_val_preds))
all_val_labels = Merge_Function.merge_data_main(all_val_labels, 0, len(all_val_labels))
val_loss /= len(val_loader)
val_accuracy = accuracy_score(all_val_labels, all_val_preds)
val_losses.append(val_loss)
val_accuracies.append(val_accuracy)
# Early stopping
early_stopping(val_loss, self.Model, model_path)
if early_stopping.early_stop:
print(f"Early stopping triggered in Fold {fold + 1} at epoch {epoch + 1}")
Total_Epoch = epoch + 1
break
# Learning rate adjustment
scheduler.step(val_loss)
else: # If no early stopping
Total_Epoch = self.Epoch
# Store fold results
all_fold_train_losses.append(train_losses)
all_fold_val_losses.append(val_losses)
all_fold_train_accuracies.append(train_accuracies)
all_fold_val_accuracies.append(val_accuracies)
Losses = [train_losses, val_losses]
Accuracies = [train_accuracies, val_accuracies]
plot_history(Total_Epoch, Losses, Accuracies, "train" + str(fold), self.Experiment_Name) # 將訓練結果化成圖,並將化出來的圖丟出去儲存
# Aggregate results across folds
avg_train_losses = np.mean([losses[-1] for losses in all_fold_train_losses])
avg_val_losses = np.mean([losses[-1] for losses in all_fold_val_losses])
avg_train_accuracies = np.mean([acc[-1] for acc in all_fold_train_accuracies])
avg_val_accuracies = np.mean([acc[-1] for acc in all_fold_val_accuracies])
print(f"\nCross-Validation Results:")
print(f"Avg Train Loss: {avg_train_losses:.4f}, Avg Val Loss: {avg_val_losses:.4f}")
print(f"Avg Train Acc: {avg_train_accuracies:.4f}, Avg Val Acc: {avg_val_accuracies:.4f}")
File.Save_TXT_File(content = f"\nCross-Validation Results:\nAvg Train Loss: {avg_train_losses:.4f}, Avg Val Loss: {avg_val_losses:.4f}\nAvg Train Acc: {avg_train_accuracies:.4f}, Avg Val Acc: {avg_val_accuracies:.4f}\n", File_Name = "Training_Average_Result")
pass
def Evaluate_Model(self, cnn_model, Model_Name, counter):
# (Unchanged Evaluate_Model method)
cnn_model.eval()
True_Label, Predict_Label = [], []
True_Label_OneHot, Predict_Label_OneHot = [], []
loss = 0.0
with torch.no_grad():
for images, labels in self.Test_Dataloader:
images, labels = torch.tensor(images).to(self.device), torch.tensor(labels).to(self.device)
outputs = cnn_model(images)
Output_Values, Output_Indexs = torch.max(outputs, 1)
True_Indexs = np.argmax(labels.cpu().numpy(), 1)
True_Label.append(Output_Indexs.cpu().numpy())
Predict_Label.append(True_Indexs)
Predict_Label_OneHot.append(torch.tensor(outputs, dtype=torch.float32).cpu().numpy()[0])
True_Label_OneHot.append(torch.tensor(labels, dtype=torch.int).cpu().numpy()[0])
loss /= len(self.Test_Dataloader)
True_Label_OneHot = torch.tensor(True_Label_OneHot, dtype=torch.int)
Predict_Label_OneHot = torch.tensor(Predict_Label_OneHot, dtype=torch.float32)
accuracy = accuracy_score(True_Label, Predict_Label)
precision = precision_score(True_Label, Predict_Label, average="macro")
recall = recall_score(True_Label, Predict_Label, average="macro")
AUC = auroc(Predict_Label_OneHot, True_Label_OneHot, num_labels=self.Number_Of_Classes, task="multilabel", average="macro")
f1 = f1_score(True_Label, Predict_Label, average="macro")
Matrix = self.record_matrix_image(True_Label, Predict_Label, Model_Name, counter)
print(self.record_everyTime_test_result(loss, accuracy, precision, recall, AUC, f1, counter, self.Experiment_Name, Matrix)) # 紀錄當前訓練完之後的預測結果並輸出成csv檔
pass
def record_matrix_image(self, True_Labels, Predict_Labels, model_name, index):
'''劃出混淆矩陣(熱力圖)'''
# 計算混淆矩陣
matrix = confusion_matrix(True_Labels, Predict_Labels)
draw_heatmap(matrix, model_name, index) # 呼叫畫出confusion matrix的function
return matrix
def record_everyTime_test_result(self, loss, accuracy, precision, recall, auc, f, indexs, model_name, Matrix):
'''記錄我單次的訓練結果並將它輸出到檔案中'''
File = Process_File()
Dataframe = pd.DataFrame(
{
"model_name" : str(model_name),
"loss" : "{:.2f}".format(loss),
"precision" : "{:.2f}%".format(precision * 100),
"recall" : "{:.2f}%".format(recall * 100),
"accuracy" : "{:.2f}%".format(accuracy * 100),
"f" : "{:.2f}%".format(f * 100),
"AUC" : "{:.2f}%".format(auc * 100)
}, index = [indexs])
File.Save_CSV_File("train_result", Dataframe)
return Dataframe