Modification to pytorch not Finish

This commit is contained in:
2024-12-15 03:55:05 +08:00
parent 4bd0ae023d
commit bd4310f0dc
24 changed files with 284 additions and 146 deletions

Binary file not shown.

Binary file not shown.

15
Model_Loss/Loss.py Normal file
View File

@@ -0,0 +1,15 @@
from torch import nn
from torch.nn import functional
class Entropy_Loss(nn.Module):
def __init__(self):
super(Entropy_Loss, self).__init__()
def forward(self, outputs, labels):
# 範例: 使用均方誤差作為損失計算
# outputs = torch.argmax(outputs, 1)
# outputs = outputs.float()
labels = labels.float()
loss = functional.binary_cross_entropy(outputs, labels)
return loss

View File

@@ -1,5 +1,6 @@
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
from torch.nn import functional
class Tool:
def __init__(self) -> None:
@@ -19,6 +20,10 @@ class Tool:
self.__Normal_ImageGenerator_Data_Root = ""
self.__Comprehensive_Generator_Root = ""
self.Training_Zip = ""
self.Validation_Zip = ""
self.Testing_Zip = ""
self.__Labels = []
self.__OneHot_Encording = []
pass
@@ -43,12 +48,17 @@ class Tool:
self.__Normal_ImageGenerator_Data_Root = "../Dataset/Training/Normal_ImageGenerator"
self.__Comprehensive_Generator_Root = "../Dataset/Training/Comprehensive_ImageGenerator"
def Set_OneHotEncording(self, Content):
Array_To_DataFrame = pd.DataFrame(Content)
onehotencoder = OneHotEncoder()
onehot = onehotencoder.fit_transform(Array_To_DataFrame).toarray()
self.__OneHot_Encording = onehot
def Set_OneHotEncording(self, content, Number_Of_Classes):
OneHot_labels = functional.one_hot(content, Number_Of_Classes)
return OneHot_labels
def Set_Zips(self, Datas, Labels, Address_Name):
if Address_Name == "Training":
self.Training_Zip = zip(Datas, Labels)
if Address_Name == "Validation":
self.Validation_Zip = zip(Datas, Labels)
if Address_Name == "Testing":
self.Testing_Zip = zip(Datas, Labels)
def Get_Data_Label(self):
'''
@@ -80,4 +90,7 @@ class Tool:
return self.__Comprehensive_Generator_Root
def Get_OneHot_Encording_Label(self):
return self.__OneHot_Encording
return self.__OneHot_Encording
def Get_Zip(self):
return self.Training_Zip, self.Testing_Zip, self.Validation_Zip

View File

@@ -1,40 +1,68 @@
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from keras.layers import GlobalAveragePooling2D, Dense, Reshape, Multiply
from Load_process.file_processing import Process_File
import datetime
import torch
def attention_block(input):
channel = input.shape[-1]
# def attention_block(input):
# channel = input.shape[-1]
GAP = GlobalAveragePooling2D()(input)
# GAP = GlobalAveragePooling2D()(input)
block = Dense(units = channel // 16, activation = "relu")(GAP)
block = Dense(units = channel, activation = "sigmoid")(block)
block = Reshape((1, 1, channel))(block)
# block = Dense(units = channel // 16, activation = "relu")(GAP)
# block = Dense(units = channel, activation = "sigmoid")(block)
# block = Reshape((1, 1, channel))(block)
block = Multiply()([input, block])
# block = Multiply()([input, block])
return block
# return block
def call_back(model_name, index):
class EarlyStopping:
def __init__(self, patience=74, verbose=False, delta=0):
self.patience = patience
self.verbose = verbose
self.delta = delta
self.counter = 0
self.best_loss = None
self.early_stop = False
def __call__(self, val_loss, model, save_path):
if self.best_loss is None:
self.best_loss = val_loss
self.save_checkpoint(val_loss, model, save_path)
elif val_loss > self.best_loss + self.delta:
self.counter += 1
if self.verbose:
print(f"EarlyStopping counter: {self.counter} out of {self.patience}")
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.save_checkpoint(val_loss, model, save_path)
self.counter = 0
def save_checkpoint(self, val_loss, model, save_path):
torch.save(model.state_dict(), save_path)
if self.verbose:
print(f"Validation loss decreased ({self.best_loss:.6f} --> {val_loss:.6f}). Saving model to {save_path}")
def call_back(model_name, index, optimizer):
File = Process_File()
model_dir = '../Result/save_the_best_model/' + model_name
File.JudgeRoot_MakeDir(model_dir)
modelfiles = File.Make_Save_Root('best_model( ' + str(datetime.date.today()) + " )-" + str(index) + ".weights.h5", model_dir)
model_mckp = ModelCheckpoint(modelfiles, monitor='val_loss', save_best_only=True, save_weights_only = True, mode='auto')
# model_mckp = ModelCheckpoint(modelfiles, monitor='val_loss', save_best_only=True, save_weights_only = True, mode='auto')
earlystop = EarlyStopping(monitor='val_loss', patience=74, verbose=1) # 提早停止
earlystop = EarlyStopping(patience=74, verbose=True) # 提早停止
reduce_lr = ReduceLROnPlateau(
monitor = 'val_loss',
reduce_lr = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer,
factor = 0.94, # 學習率降低的量。 new_lr = lr * factor
patience = 2, # 沒有改進的時期數,之後學習率將降低
verbose = 0,
mode = 'auto',
mode = 'min',
min_lr = 0 # 學習率下限
)
callbacks_list = [model_mckp, earlystop, reduce_lr]
return callbacks_list
return modelfiles, earlystop, reduce_lr

Binary file not shown.

View File

@@ -5,21 +5,21 @@ import matplotlib.figure as figure
import matplotlib.backends.backend_agg as agg
from Load_process.file_processing import Process_File
def plot_history(history_value, file_name, model_name):
def plot_history(Epochs, Losses, Accuracys, file_name, model_name):
File = Process_File()
plt.figure(figsize=(16,4))
plt.subplot(1,2,1)
plt.plot(history_value.history['accuracy'])
plt.plot(history_value.history['val_accuracy'])
plt.plot(range(1, Epochs + 1), Losses[0])
plt.plot(range(1, Epochs + 1), Losses[1])
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.legend(['Train','Validation'], loc='upper left')
plt.title('Model Accuracy')
plt.subplot(1,2,2)
plt.plot(history_value.history['loss'])
plt.plot(history_value.history['val_loss'])
plt.plot(range(1, Epochs + 1), Accuracys[0])
plt.plot(range(1, Epochs + 1), Accuracys[1])
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['Train','Validation'], loc='upper left')

View File

@@ -0,0 +1,131 @@
from tqdm import tqdm
from torch.nn import functional
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torchmetrics.functional import auroc
import torch.optim as optim
from all_models_tools.all_model_tools import call_back
from Model_Loss.Loss import Entropy_Loss
class All_Step:
def __init__(self, Training_Data_And_Label, Test_Data_And_Label, Validation_Data_And_Label, Model, Epoch, Number_Of_Classes):
self.Training_Data_And_Label = Training_Data_And_Label
self.Test_Data_And_Label = Test_Data_And_Label
self.Validation_Data_And_Label = Validation_Data_And_Label
self.Model = Model
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.Epoch = Epoch
self.Number_Of_Classes = Number_Of_Classes
pass
def Training_Step(self, model_name, counter):
# 定義優化器,並設定 weight_decay 參數來加入 L2 正則化
Optimizer = optim.SGD(self.Model.parameters(), lr=0.045, momentum = 0.9, weight_decay=0.1)
model_path, early_stopping, scheduler = call_back(model_name, counter, Optimizer)
criterion = Entropy_Loss() # 使用自定義的損失函數
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []
for epoch in range(self.Epoch):
self.Model.train()
running_loss = 0.0
all_train_preds = []
all_train_labels = []
epoch_iterator = tqdm(self.Training_Data_And_Label, desc= "Training (Epoch %d)" % epoch)
for inputs, labels in epoch_iterator:
# labels = np.reshape(labels, (int(labels.shape[0]), 1))
inputs, OneHot_labels = inputs.to(self.device), OneHot_labels.to(self.device)
# inputs, labels = inputs.cuda(), labels.cuda()
Optimizer.zero_grad()
outputs = self.Model(inputs)
loss = criterion(outputs, OneHot_labels)
loss.backward()
Optimizer.step()
running_loss += loss.item()
# 收集訓練預測和標籤
_, preds = torch.max(outputs, 1)
all_train_preds.extend(preds.cpu().numpy())
all_train_labels.extend(labels.cpu().numpy())
Training_Loss = running_loss/len(self.Training_Data_And_Label)
# all_train_labels = torch.FloatTensor(all_train_labels)
# all_train_labels = torch.argmax(all_train_labels, 1)
train_accuracy = accuracy_score(all_train_labels, all_train_preds)
train_losses.append(Training_Loss)
train_accuracies.append(train_accuracy)
print(f"Epoch [{epoch+1}/{self.epoch}], Loss: {Training_Loss:.4f}, Accuracy: {train_accuracy:0.2f}", end = ' ')
self.Model.eval()
val_loss = 0.0
all_val_preds = []
all_val_labels = []
with torch.no_grad():
for inputs, labels in self.Validation_Data_And_Label:
inputs, OneHot_labels = inputs.to(self.device), labels.to(self.device)
outputs = self.Model(inputs)
loss = criterion(outputs, OneHot_labels)
val_loss += loss.item()
# 驗證預測與標籤
_, preds = torch.max(outputs, 1)
all_val_preds.extend(preds.cpu().numpy())
all_val_labels.extend(labels.cpu().numpy())
# 計算驗證損失與準確率
val_loss /= len(list(self.Validation_Data_And_Label))
val_accuracy = accuracy_score(all_val_labels, all_val_preds)
val_losses.append(val_loss)
val_accuracies.append(val_accuracy)
print(f"Epoch [{epoch+1}/{self.epoch}], Loss: {val_loss:.4f}, Accuracy: {val_accuracy:0.2f}")
early_stopping(val_loss, self.Model, model_path)
if early_stopping.early_stop:
print("Early stopping triggered. Training stopped.")
break
# 學習率調整
scheduler.step(val_loss)
return train_losses, val_losses, train_accuracies, val_accuracies
def Evaluate_Model(self, cnn_model):
# 測試模型
cnn_model.eval()
True_Label, Predict_Label = [], []
loss = 0.0
with torch.no_grad():
for images, labels in self.Test_Data_And_Label:
images, OneHot_labels = images.to(self.device), OneHot_labels.to(self.device)
outputs = cnn_model(images)
_, predicted = torch.max(outputs, 1)
Predict_Label.extend(predicted.cpu().numpy())
True_Label.extend(labels.cpu().numpy())
loss /= len(self.Test_Data_And_Label)
accuracy = accuracy_score(True_Label, Predict_Label)
precision = precision_score(True_Label, Predict_Label)
recall = recall_score(True_Label, Predict_Label)
AUC = auroc(True_Label, Predict_Label, task = ["Stomatch_Cancer", "Normal"])
f1 = f1_score(True_Label, Predict_Label)
return loss, accuracy, precision, recall, AUC, f1, True_Label, Predict_Label

View File

@@ -1,38 +1,49 @@
from all_models_tools.all_model_tools import call_back
from Read_and_process_image.ReadAndProcess import Read_image_and_Process_image
from draw_tools.draw import plot_history, Confusion_Matrix_of_Two_Classification
from keras import regularizers
from Load_process.Load_Indepentend import Load_Indepentend_Data
from _validation.ValidationTheEnterData import validation_the_enter_data
from keras.layers import GlobalAveragePooling2D, Dense, Dropout
from keras.applications import Xception
from Load_process.file_processing import Process_File
from merge_class.merge import merge
from draw_tools.Grad_cam import Grad_CAM
from sklearn.metrics import confusion_matrix
from keras.models import Model
from keras.optimizers import SGD
from experiments.pytorch_Model import ModifiedXception
from Image_Process.Image_Generator import Image_generator
from Model_All_Step import All_Step
import pandas as pd
import keras
import numpy as np
import torch
import torch.nn as nn
import time
class experiments():
def __init__(self, tools, status):
'''
parmeter:
* validation_obj : 驗證物件
* cut_image : 切割影像物件
* image_processing : 讀檔與讀檔資料處理物件
* merge : 合併物件
* model_name: 取名,告訴我我是用哪個模型(可能是預處理模型/自己設計的模型)
* generator_batch_size: 每一批次要讀多少檔出來
* epoch: 訓練次數
* train_batch_size: 訓練時要多少批次的影像為1組
* generator_batch_size: 減少圖片數量對GPU記憶體的用量, 減少張數用的
* experiment_name : 本次實驗名稱
* convolution_name : Pre-train model 的最後一層Convolotion的名稱
def __init__(self, tools, Number_Of_Classes, status):
'''
# 實驗物件
## 說明:
* 用於開始訓練pytorch的物件裡面分為數個方法負責處理實驗過程的種種
## parmeter:
* Topic_Tool: 讀取訓練、驗證、測試的資料集與Label等等的內容
* cut_image: 呼叫切割影像物件
* merge: 合併的物件
* model_name: 模型名稱,告訴我我是用哪個模型(可能是預處理模型/自己設計的模型)
* experiment_name: 實驗名稱
* epoch: 訓練次數
* train_batch_size: 訓練資料的batch
* convolution_name: Grad-CAM的最後一層的名稱
* Number_Of_Classes: Label的類別
* Status: 選擇現在資料集的狀態
* device: 決定使用GPU或CPU
## Method:
* processing_main: 實驗物件的進入點
* construct_model: 決定實驗用的Model
* Training_Step: 訓練步驟,開始進行訓練驗證的部分
* Evaluate_Model: 驗證模型的準確度
* record_matrix_image: 劃出混淆矩陣(熱力圖)
* record_everyTime_test_result: 記錄我單次的訓練結果並將它輸出到檔案中
'''
self.Topic_Tool = tools
@@ -49,99 +60,72 @@ class experiments():
self.epoch = 10000
self.train_batch_size = 128
self.layers = 1
self.convolution_name = self.get_layer_name(self.model_name)
self.convolution_name = "block14_sepconv2"
self.Number_Of_Classes = Number_Of_Classes
self.Grad = ""
self.Status = status
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
pass
def processing_main(self, train, train_label, counter):
Train, Test, Validation = self.Topic_Tool.Get_Save_Roots(self.Status) # 要換不同資料集就要改
start = time.time()
self.cut_image.process_main(Test, Validation) # 呼叫處理test Data與Validation Data的function
end = time.time()
print("讀取testing與validation資料(154)執行時間:%f\n" % (end - start))
Generator = Image_generator("", "")
# Generator = Image_generator("", "")
# 將處理好的test Data 與 Validation Data 丟給這個物件的變數
self.test, self.test_label = self.cut_image.test, self.cut_image.test_label
self.validation, self.validation_label = self.cut_image.validation, self.cut_image.validation_label
self.Topic_Tool.Set_Zips(train, train_label, "Training")
self.Topic_Tool.Set_Zips(self.test, self.test_label, "Testing")
self.Topic_Tool.Set_Zips(self.validation, self.validation_label, "Validation")
self.Grad = Grad_CAM(self.Topic_Tool.Get_Data_Label(), self.test_label, self.experiment_name, self.convolution_name)
self.Training_Zip, self.Testing_Zip, self.Validation_Zip = self.Topic_Tool.Get_Zip()
# self.Grad = Grad_CAM(self.Topic_Tool.Get_Data_Label(), self.test_label, self.experiment_name, self.convolution_name)
cnn_model = self.construct_model() # 呼叫讀取模型的function
step = All_Step(self.Training_Zip, self.Testing_Zip, self.Validation_Zip, cnn_model, self.epoch, self.Number_Of_Classes)
# model_dir = '../save_the_best_model/Topic/Remove background with Normal image/best_model( 2023-10-17 )-2.h5' # 這是一個儲存模型權重的路徑,每一個模型都有一個自己權重儲存的檔
# if os.path.exists(model_dir): # 如果這個檔案存在
# cnn_model.load_weights(model_dir) # 將模型權重讀出來
# print("讀出權重\n")
Optimizer = SGD(learning_rate = 0.045, momentum = 0.9) # 決定優化器與學習率
cnn_model.compile(
loss = "binary_crossentropy",
optimizer = Optimizer,
metrics=
[
'accuracy',
keras.metrics.Precision(name='precision'),
keras.metrics.Recall(name='recall'),
keras.metrics.AUC(name = 'AUC'),
]
)
train_data = Generator.Generator_Content(5) # 叫入ImageGeneratorr的物件為了要讓我訓練時可以分批次讀取資料GPU記憶體才不會爆
cnn_model.summary() # 顯示模型架構
# print("讀出權重\n")
print("\n\n\n讀取訓練資料(70000)執行時間:%f\n\n" % (end - start))
history = cnn_model.fit(
train_data.flow(train, train_label, batch_size = self.generator_batch_size),
# x = train,
# y = train_label,
epochs = self.epoch,
batch_size = self.train_batch_size,
validation_data = (self.validation, self.validation_label),
callbacks = call_back(self.experiment_name, counter) # 呼叫 call back list
# callbacks = call_back("best_model", self.counter) # 呼叫 call back list
)
Matrix = self.record_matrix_image(cnn_model, self.experiment_name, counter) # 紀錄混淆矩陣的function
loss, accuracy, precision, recall, AUC = cnn_model.evaluate(self.test, self.test_label) # 預測結果
train_losses, val_losses, train_accuracies, val_accuracies = step.Training_Step(self.model_name, counter)
loss, accuracy, precision, recall, AUC, f1, True_Label, Predict_Label = step.Evaluate_Model(cnn_model)
# 防分母為0的時候
if recall == 0 or precision == 0:
f = 0
else:
f = (1 + 0.5 * 0.5) * ((recall * precision) / (0.5 * 0.5 * recall + precision))
print(self.record_everyTime_test_result(loss, accuracy, precision, recall, AUC, f, counter, self.experiment_name, Matrix)) # 紀錄當前訓練完之後的預測結果並輸出成csv檔
self.record_matrix_image(True_Label, Predict_Label, self.model_name, counter)
print(self.record_everyTime_test_result(loss, accuracy, precision, recall, AUC, f1, counter, self.experiment_name)) # 紀錄當前訓練完之後的預測結果並輸出成csv檔
plot_history(history, "train" + str(counter), self.experiment_name) # 將訓練結果化成圖,並將化出來的圖丟出去儲存
self.Grad.process_main(cnn_model, counter, self.test)
Losses = [train_losses, val_losses]
Accuracyes = [train_accuracies, val_accuracies]
plot_history(self.epoch, Losses, Accuracyes, "train" + str(counter), self.experiment_name) # 將訓練結果化成圖,並將化出來的圖丟出去儲存
# self.Grad.process_main(cnn_model, counter, self.test)
return loss, accuracy, precision, recall, AUC, f
return loss, accuracy, precision, recall, AUC, f1
def construct_model(self):
'''決定我這次訓練要用哪個model'''
xception = Xception(include_top = False, weights = "imagenet", input_shape = (512, 512, 3))
Flatten = GlobalAveragePooling2D()(xception.output)
output = Dense(units = 1370, activation = "relu", kernel_regularizer = regularizers.L2())(Flatten)
output = Dropout(0.6)(output)
output = Dense(units = 2, activation = "softmax")(output)
cnn_model = ModifiedXception()
cnn_model = Model(inputs = xception.input, outputs = output)
if torch.cuda.device_count() > 1:
cnn_model = nn.DataParallel(cnn_model)
cnn_model = cnn_model.to(self.device)
return cnn_model
def record_matrix_image(self, cnn_model : Model, model_name, index):
def record_matrix_image(self, True_Labels, Predict_Labels, model_name, index):
'''劃出混淆矩陣(熱力圖)'''
result = cnn_model.predict(self.test) # 利用predict function來預測結果
result = np.argmax(result, axis = 1) # 將預測出來的結果從one-hot encoding轉成label-encoding
y_test = np.argmax(self.test_label, axis = 1)
matrix = confusion_matrix(result, y_test, labels = [0, 1]) # 丟入confusion matrix的function中以形成混淆矩陣
# 計算混淆矩陣
matrix = confusion_matrix(True_Labels, Predict_Labels)
Confusion_Matrix_of_Two_Classification(model_name, matrix, index) # 呼叫畫出confusion matrix的function
return matrix.real
@@ -163,37 +147,4 @@ class experiments():
File.Save_CSV_File("train_result", Dataframe)
# File.Save_TXT_File("Matrix_Result : " + str(Matrix), model_name + "_train" + str(indexs))
return Dataframe
def get_layer_name(self, model_name):
if(self.validation_obj.validation_string(model_name, "VGG19")):
return "block5_conv4"
if(self.validation_obj.validation_string(model_name, "ResNet50")):
return "conv5_block3_3_conv"
if(self.validation_obj.validation_string(model_name, "Xception")):
return "block14_sepconv2"
if(self.validation_obj.validation_string(model_name, "DenseNet121")):
return "conv5_block16_concat"
if(self.validation_obj.validation_string(model_name, "InceptionResNetV2")):
return "conv_7b"
if(self.validation_obj.validation_string(model_name, "InceptionV3")):
return "conv2d_93"
if(self.validation_obj.validation_string(model_name, "MobileNet")):
return "conv_pw_13"
if(self.validation_obj.validation_string(model_name, "MobileNetV2")):
return "Conv_1"
if(self.validation_obj.validation_string(model_name, "NASNetLarge")):
return "separable_conv_2_normal_left5_18"
if(self.validation_obj.validation_string(model_name, "ResNet101")):
return "conv5_block3_3_conv"
if(self.validation_obj.validation_string(model_name, "ResNet101V2")):
return "conv5_block3_3_conv"
if(self.validation_obj.validation_string(model_name, "ResNet152")):
return "conv5_block3_3_conv"
if(self.validation_obj.validation_string(model_name, "ResNet152V2")):
return "conv5_block3_out"
if(self.validation_obj.validation_string(model_name, "ResNet50v2")):
return "conv5_block3_out"
if(self.validation_obj.validation_string(model_name, "VGG16")):
return "block5_conv3"
return Dataframe

14
main.py
View File

@@ -6,18 +6,17 @@ from model_data_processing.processing import shuffle_data
from Load_process.LoadData import Load_Data_Prepare
from Calculate_Process.Calculate import Calculate
from merge_class.merge import merge
import tensorflow as tf
import time
import torch
import os
if __name__ == "__main__":
# 測試GPU是否可用
print('TensorFlow version:', tf.__version__)
physical_devices = tf.config.experimental.list_physical_devices('GPU')
print(physical_devices)
assert len(physical_devices) > 0, "Not enough GPU hardware devices available"
tf.config.experimental.set_memory_growth(physical_devices[0], True)
os.environ["CUDA_VISIBLE_DEVICES"]='0'
flag = torch.cuda.is_available()
if not flag:
print("CUDA不可用\n")
else:
print(f"CUDA可用數量為{torch.cuda.device_count()}\n")
Status = 2 # 決定要使用什麼資料集
# 要換不同資料集就要改
@@ -86,6 +85,7 @@ if __name__ == "__main__":
print(len(training_data))
training_data, train_label = image_processing.image_data_processing(training_data, train_label) # 將讀出來的檔做正規化。降label轉成numpy array 格式
training_data = image_processing.normalization(training_data)
end = time.time()
print("\n\n\n讀取訓練資料(70000)執行時間:%f\n\n" % (end - start))