761 lines
32 KiB
Python
761 lines
32 KiB
Python
import torch
|
||
import matplotlib
|
||
matplotlib.use('Agg') # 設置非 GUI 後端,避免 Tkinter 錯誤
|
||
import matplotlib.pyplot as plt
|
||
from PIL import Image
|
||
import numpy as np
|
||
import os
|
||
from skimage.segmentation import slic
|
||
from skimage.util import img_as_float
|
||
from skimage.color import label2rgb
|
||
from scipy.spatial.distance import cdist
|
||
|
||
# 設置全局中文字體支持
|
||
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'Arial Unicode MS', 'DejaVu Sans']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
from utils.Stomach_Config import Loading_Config, Save_Result_File_Config
|
||
from Load_process.LoadData import Loding_Data_Root
|
||
from Load_process.file_processing import Process_File
|
||
from merge_class.merge import merge
|
||
from model_data_processing.processing import make_label_list
|
||
from Load_process.LoadData import Load_Data_Prepare
|
||
|
||
|
||
def save_superpixel_regions(image_array, segments, save_dir, image_name, max_regions=50):
|
||
"""
|
||
Save individual superpixel regions as separate images.
|
||
|
||
Args:
|
||
- image_array: Original image array (H, W, 3)
|
||
- segments: Superpixel segmentation labels (H, W)
|
||
- save_dir: Directory to save superpixel region images
|
||
- image_name: Base name for the image files
|
||
- max_regions: Maximum number of regions to save (to avoid too many files)
|
||
|
||
Returns:
|
||
- saved_regions: List of saved region information
|
||
"""
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
saved_regions = []
|
||
unique_segments = np.unique(segments)
|
||
|
||
# Limit the number of regions to save
|
||
if len(unique_segments) > max_regions:
|
||
print(f"⚠️ 超像素數量 ({len(unique_segments)}) 超過限制 ({max_regions}),只保存前 {max_regions} 個")
|
||
unique_segments = unique_segments[:max_regions]
|
||
|
||
print(f"💾 保存 {len(unique_segments)} 個超像素區域到: {save_dir}")
|
||
|
||
for i, segment_id in enumerate(unique_segments):
|
||
# Get mask for current superpixel
|
||
mask = segments == segment_id
|
||
|
||
# Get bounding box of the region
|
||
y_coords, x_coords = np.where(mask)
|
||
if len(y_coords) == 0:
|
||
continue
|
||
|
||
min_y, max_y = y_coords.min(), y_coords.max()
|
||
min_x, max_x = x_coords.min(), x_coords.max()
|
||
|
||
# Extract the region with some padding
|
||
padding = 5
|
||
min_y = max(0, min_y - padding)
|
||
max_y = min(image_array.shape[0], max_y + padding + 1)
|
||
min_x = max(0, min_x - padding)
|
||
max_x = min(image_array.shape[1], max_x + padding + 1)
|
||
|
||
# Extract region from original image
|
||
region_image = image_array[min_y:max_y, min_x:max_x].copy()
|
||
region_mask = mask[min_y:max_y, min_x:max_x]
|
||
|
||
# Apply mask to make background transparent/black
|
||
region_image[~region_mask] = [0, 0, 0] # Set non-region pixels to black
|
||
|
||
# Convert to PIL Image and save
|
||
region_pil = Image.fromarray(region_image.astype(np.uint8))
|
||
|
||
# Create filename
|
||
region_filename = f"{image_name}_superpixel_{segment_id:03d}_region_{i+1:03d}.png"
|
||
region_path = os.path.join(save_dir, region_filename)
|
||
|
||
# Save the region
|
||
region_pil.save(region_path)
|
||
|
||
# Calculate region statistics
|
||
region_pixels = image_array[mask]
|
||
mean_color = np.mean(region_pixels, axis=0)
|
||
region_area = np.sum(mask)
|
||
centroid_y = np.mean(y_coords)
|
||
centroid_x = np.mean(x_coords)
|
||
|
||
region_info = {
|
||
'segment_id': segment_id,
|
||
'filename': region_filename,
|
||
'path': region_path,
|
||
'area': region_area,
|
||
'centroid': (centroid_x, centroid_y),
|
||
'mean_color': mean_color,
|
||
'bbox': (min_x, min_y, max_x, max_y)
|
||
}
|
||
saved_regions.append(region_info)
|
||
|
||
print(f"✅ 成功保存 {len(saved_regions)} 個超像素區域影像")
|
||
return saved_regions
|
||
|
||
|
||
def calculate_optimal_superpixel_params(image_size):
|
||
"""
|
||
Calculate optimal superpixel parameters based on image size.
|
||
|
||
Args:
|
||
- image_size: Total number of pixels (width * height)
|
||
|
||
Returns:
|
||
- n_segments: Number of superpixel segments
|
||
- compactness: Compactness parameter for SLIC
|
||
"""
|
||
# 根據圖片大小動態調整參數
|
||
if image_size < 50000: # 小圖片 (< 224x224)
|
||
n_segments = min(100, max(50, image_size // 500))
|
||
compactness = 15
|
||
elif image_size < 200000: # 中等圖片 (< 447x447)
|
||
n_segments = min(300, max(100, image_size // 800))
|
||
compactness = 12
|
||
elif image_size < 500000: # 大圖片 (< 707x707)
|
||
n_segments = min(500, max(200, image_size // 1000))
|
||
compactness = 10
|
||
else: # 超大圖片
|
||
n_segments = min(800, max(300, image_size // 1500))
|
||
compactness = 8
|
||
|
||
return int(n_segments), compactness
|
||
|
||
|
||
def extract_superpixel_features(image_array, segments):
|
||
"""
|
||
Extract features for each superpixel region.
|
||
|
||
Args:
|
||
- image_array: Original image array (H, W, 3)
|
||
- segments: Superpixel segmentation labels (H, W)
|
||
|
||
Returns:
|
||
- features: Array of features for each superpixel (N_superpixels, 5)
|
||
[mean_R, mean_G, mean_B, norm_centroid_x, norm_centroid_y]
|
||
- centroids: Array of centroid positions for each superpixel (N_superpixels, 2)
|
||
"""
|
||
n_segments = len(np.unique(segments))
|
||
features = []
|
||
centroids = []
|
||
|
||
for segment_id in np.unique(segments):
|
||
# Get mask for current superpixel
|
||
mask = segments == segment_id
|
||
|
||
# Extract color features (mean RGB)
|
||
region_pixels = image_array[mask]
|
||
mean_color = np.mean(region_pixels, axis=0)
|
||
|
||
# Extract position features (centroid)
|
||
y_coords, x_coords = np.where(mask)
|
||
centroid_y = np.mean(y_coords)
|
||
centroid_x = np.mean(x_coords)
|
||
|
||
# Combine features: [mean_R, mean_G, mean_B, centroid_x, centroid_y]
|
||
# Normalize centroid coordinates to [0, 1] range
|
||
norm_centroid_x = centroid_x / image_array.shape[1]
|
||
norm_centroid_y = centroid_y / image_array.shape[0]
|
||
feature_vector = np.concatenate([mean_color, [norm_centroid_x, norm_centroid_y]])
|
||
|
||
features.append(feature_vector)
|
||
centroids.append([centroid_x, centroid_y])
|
||
|
||
return np.array(features), np.array(centroids)
|
||
|
||
|
||
def fuzzy_c_means(data, n_clusters, m=2.0, max_iter=100, tol=1e-4, random_state=None):
|
||
"""
|
||
Fuzzy C-means clustering algorithm implementation.
|
||
|
||
Args:
|
||
- data: Input data array (N_samples, N_features)
|
||
- n_clusters: Number of clusters
|
||
- m: Fuzziness parameter (m > 1, typically 2.0)
|
||
- max_iter: Maximum number of iterations
|
||
- tol: Tolerance for convergence
|
||
- random_state: Random seed for reproducibility
|
||
|
||
Returns:
|
||
- centers: Cluster centers (n_clusters, N_features)
|
||
- membership: Membership matrix (N_samples, n_clusters)
|
||
- labels: Hard cluster assignments (N_samples,)
|
||
- objective: Final objective function value
|
||
- n_iter: Number of iterations performed
|
||
"""
|
||
if random_state is not None:
|
||
np.random.seed(random_state)
|
||
torch.manual_seed(random_state)
|
||
|
||
# Convert to torch tensor
|
||
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||
X = torch.from_numpy(data).float().to(device)
|
||
n_samples, n_features = X.shape
|
||
|
||
# Initialize membership matrix randomly
|
||
U = torch.rand(n_samples, n_clusters, device=device)
|
||
U = U / U.sum(dim=1, keepdim=True) # Normalize so each row sums to 1
|
||
|
||
print(f"🔄 開始 Fuzzy C-means 聚類: {n_clusters} 個聚類中心, 模糊度參數 m={m}")
|
||
|
||
for iteration in range(max_iter):
|
||
U_old = U.clone()
|
||
|
||
# Update cluster centers
|
||
Um = U ** m # Membership matrix raised to power m
|
||
centers = (Um.T @ X) / Um.sum(dim=0, keepdim=True).T
|
||
|
||
# Update membership matrix
|
||
# Calculate distances from each point to each center
|
||
distances = torch.cdist(X, centers) # (n_samples, n_clusters)
|
||
|
||
# Avoid division by zero
|
||
distances = torch.clamp(distances, min=1e-10)
|
||
|
||
# Calculate new membership values
|
||
power = 2.0 / (m - 1.0)
|
||
distance_matrix = distances ** power
|
||
|
||
# For each sample, calculate membership to each cluster
|
||
for i in range(n_samples):
|
||
for j in range(n_clusters):
|
||
denominator = torch.sum((distance_matrix[i, j] / distance_matrix[i, :]))
|
||
U[i, j] = 1.0 / denominator
|
||
|
||
# Check for convergence
|
||
diff = torch.norm(U - U_old)
|
||
if diff < tol:
|
||
print(f"✅ Fuzzy C-means 收斂於第 {iteration + 1} 次迭代 (差異: {diff:.6f})")
|
||
break
|
||
|
||
if (iteration + 1) % 20 == 0:
|
||
print(f" 迭代 {iteration + 1}/{max_iter}, 差異: {diff:.6f}")
|
||
|
||
# Calculate final objective function value
|
||
Um = U ** m
|
||
distances_squared = torch.cdist(X, centers) ** 2
|
||
objective = torch.sum(Um * distances_squared).item()
|
||
|
||
# Get hard cluster assignments (highest membership)
|
||
labels = torch.argmax(U, dim=1)
|
||
|
||
# Convert back to numpy
|
||
centers_np = centers.cpu().numpy()
|
||
membership_np = U.cpu().numpy()
|
||
labels_np = labels.cpu().numpy()
|
||
|
||
print(f"🎯 Fuzzy C-means 完成: 目標函數值 = {objective:.4f}")
|
||
|
||
return centers_np, membership_np, labels_np, objective, iteration + 1
|
||
|
||
|
||
def determine_optimal_clusters(data, gamma_values, max_clusters=10, min_clusters=2):
|
||
"""
|
||
Determine optimal number of clusters using gamma values from density peak analysis
|
||
and fuzzy clustering validation indices.
|
||
|
||
Args:
|
||
- data: Input data array (N_samples, N_features)
|
||
- gamma_values: Gamma values from density peak analysis
|
||
- max_clusters: Maximum number of clusters to test
|
||
- min_clusters: Minimum number of clusters to test
|
||
|
||
Returns:
|
||
- optimal_k: Optimal number of clusters
|
||
- scores: Dictionary containing validation scores for each k
|
||
"""
|
||
n_samples = len(data)
|
||
max_clusters = min(max_clusters, n_samples - 1)
|
||
|
||
# Method 1: Use gamma values to estimate cluster centers
|
||
# Sort gamma values and look for significant drops
|
||
sorted_gamma = np.sort(gamma_values)[::-1] # Descending order
|
||
gamma_diffs = np.diff(sorted_gamma)
|
||
|
||
# Find the largest drop in gamma values (elbow method)
|
||
if len(gamma_diffs) > 0:
|
||
elbow_idx = np.argmax(np.abs(gamma_diffs)) + 1
|
||
gamma_suggested_k = min(max_clusters, max(min_clusters, elbow_idx))
|
||
else:
|
||
gamma_suggested_k = min_clusters
|
||
|
||
print(f"📊 基於 Gamma 值分析建議的聚類數: {gamma_suggested_k}")
|
||
|
||
# Method 2: Test different k values with fuzzy clustering validation
|
||
scores = {}
|
||
best_k = gamma_suggested_k
|
||
best_score = -np.inf
|
||
|
||
print(f"🔍 測試聚類數從 {min_clusters} 到 {max_clusters}...")
|
||
|
||
for k in range(min_clusters, max_clusters + 1):
|
||
try:
|
||
# Perform fuzzy c-means clustering
|
||
centers, membership, labels, objective, n_iter = fuzzy_c_means(
|
||
data, k, m=2.0, max_iter=50, random_state=42
|
||
)
|
||
|
||
# Calculate Partition Coefficient (PC) - higher is better
|
||
pc = np.mean(np.sum(membership ** 2, axis=1))
|
||
|
||
# Calculate Partition Entropy (PE) - lower is better
|
||
pe = -np.mean(np.sum(membership * np.log(membership + 1e-10), axis=1))
|
||
|
||
# Calculate Modified Partition Coefficient (MPC) - higher is better
|
||
mpc = 1 - k / (k - 1) * (1 - pc) if k > 1 else pc
|
||
|
||
# Combined score (higher is better)
|
||
combined_score = pc - 0.1 * pe + 0.5 * mpc
|
||
|
||
scores[k] = {
|
||
'pc': pc,
|
||
'pe': pe,
|
||
'mpc': mpc,
|
||
'combined_score': combined_score,
|
||
'objective': objective,
|
||
'n_iter': n_iter
|
||
}
|
||
|
||
print(f" K={k}: PC={pc:.3f}, PE={pe:.3f}, MPC={mpc:.3f}, 組合分數={combined_score:.3f}")
|
||
|
||
if combined_score > best_score:
|
||
best_score = combined_score
|
||
best_k = k
|
||
|
||
except Exception as e:
|
||
print(f" K={k}: 聚類失敗 - {e}")
|
||
scores[k] = {'error': str(e)}
|
||
|
||
print(f"🎯 最佳聚類數: {best_k} (組合分數: {best_score:.3f})")
|
||
|
||
return best_k, scores
|
||
|
||
|
||
def visualize_clustering_results(image_array, segments, labels, centers, membership, save_path):
|
||
"""
|
||
Visualize fuzzy clustering results on the original image.
|
||
|
||
Args:
|
||
- image_array: Original image array (H, W, 3)
|
||
- segments: Superpixel segmentation labels (H, W)
|
||
- labels: Hard cluster assignments for each superpixel
|
||
- centers: Cluster centers
|
||
- membership: Fuzzy membership matrix
|
||
- save_path: Path to save the visualization
|
||
"""
|
||
# 設置中文字體支持
|
||
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'DejaVu Sans']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
|
||
|
||
# Original image
|
||
axes[0, 0].imshow(image_array)
|
||
axes[0, 0].set_title('原始影像', fontsize=14)
|
||
axes[0, 0].axis('off')
|
||
|
||
# Superpixel segmentation
|
||
superpixel_img = label2rgb(segments, image_array, kind='avg', bg_label=0)
|
||
axes[0, 1].imshow(superpixel_img)
|
||
axes[0, 1].set_title(f'超像素分割 ({len(np.unique(segments))} 個區域)', fontsize=14)
|
||
axes[0, 1].axis('off')
|
||
|
||
# Hard clustering result
|
||
n_clusters = len(centers)
|
||
colors = plt.cm.Set3(np.linspace(0, 1, n_clusters))
|
||
|
||
# Create clustering visualization
|
||
cluster_img = np.zeros_like(image_array)
|
||
for segment_id in np.unique(segments):
|
||
if segment_id < len(labels):
|
||
cluster_id = labels[segment_id]
|
||
mask = segments == segment_id
|
||
cluster_img[mask] = colors[cluster_id][:3]
|
||
|
||
axes[0, 2].imshow(cluster_img)
|
||
axes[0, 2].set_title(f'硬聚類結果 ({n_clusters} 個聚類)', fontsize=14)
|
||
axes[0, 2].axis('off')
|
||
|
||
# Fuzzy membership visualization for top 3 clusters
|
||
for i in range(min(3, n_clusters)):
|
||
fuzzy_img = np.zeros(image_array.shape[:2])
|
||
for segment_id in np.unique(segments):
|
||
if segment_id < len(membership):
|
||
mask = segments == segment_id
|
||
fuzzy_img[mask] = membership[segment_id, i]
|
||
|
||
im = axes[1, i].imshow(fuzzy_img, cmap='hot', vmin=0, vmax=1)
|
||
axes[1, i].set_title(f'聚類 {i+1} 的模糊隸屬度', fontsize=14)
|
||
axes[1, i].axis('off')
|
||
plt.colorbar(im, ax=axes[1, i], fraction=0.046, pad=0.04)
|
||
|
||
plt.tight_layout()
|
||
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||
plt.close()
|
||
print(f"📊 聚類結果可視化已保存至: {save_path}")
|
||
|
||
|
||
def compute_decision_graph(image_path, Save_Root, dc=None, use_gaussian=False, threshold_factor=2.0, use_superpixels=True, n_segments=None, compactness=None, save_regions=True, max_regions=50):
|
||
"""
|
||
Process a single image to compute the decision graph using Density Peak Clustering principles.
|
||
Identifies potential cluster centers without performing full clustering.
|
||
Also computes gamma (rho * delta) and n (index in descending order of gamma) values.
|
||
|
||
Args:
|
||
- image_path: Path to the image file.
|
||
- dc: Cut-off distance. If None, approximates it as 2% of average neighbors.
|
||
- use_gaussian: If True, uses Gaussian kernel for density; else cut-off kernel.
|
||
- threshold_factor: Factor for std deviation to determine thresholds for centers.
|
||
- use_superpixels: If True, uses SLIC superpixel segmentation instead of raw pixels.
|
||
- n_segments: Number of superpixel segments (only used if use_superpixels=True).
|
||
- compactness: Compactness parameter for SLIC algorithm (only used if use_superpixels=True).
|
||
|
||
Returns:
|
||
- dict: Contains center_indices, center_points, rho, delta, gamma, and n arrays.
|
||
gamma = rho * delta (product of local density and minimum distance)
|
||
n = index in descending order of gamma (1-indexed as per Density Peak convention)
|
||
If use_superpixels=True, also contains 'segments' and 'superpixel_features'.
|
||
"""
|
||
# Load image
|
||
img = Image.open(image_path).convert('RGB')
|
||
img_array = np.array(img) / 255.0 # Normalize to [0, 1]
|
||
image_size = img_array.shape[0] * img_array.shape[1]
|
||
|
||
# 強制使用 Superpixel 分割,並動態計算參數
|
||
if n_segments is None or compactness is None:
|
||
n_segments, compactness = calculate_optimal_superpixel_params(image_size)
|
||
|
||
print(f"🖼️ 圖像大小: {img.size} ({image_size:,} 像素)")
|
||
print(f"🔧 使用動態 Superpixel 參數: n_segments={n_segments}, compactness={compactness}")
|
||
|
||
# Apply SLIC superpixel segmentation (強制使用)
|
||
print(f"🎯 應用 SLIC 超像素分割,目標 {n_segments} 個區域...")
|
||
segments = slic(img_array, n_segments=n_segments, compactness=compactness,
|
||
start_label=1, enforce_connectivity=True)
|
||
|
||
# Extract features for each superpixel
|
||
superpixel_features, superpixel_centroids = extract_superpixel_features(img_array, segments)
|
||
points = torch.from_numpy(superpixel_features).float()
|
||
|
||
print(f"✅ 成功從 {img_array.shape[0] * img_array.shape[1]:,} 像素壓縮到 {len(superpixel_features)} 個超像素")
|
||
|
||
# Save superpixel regions if requested
|
||
if save_regions:
|
||
# Get image name without extension
|
||
image_name = os.path.splitext(os.path.basename(image_path))[0]
|
||
superpixel_regions_dir = os.path.join(Save_Root, f"{image_name}_superpixel_regions")
|
||
|
||
# Convert back to 0-255 range for saving
|
||
img_array_255 = (img_array * 255).astype(np.uint8)
|
||
saved_regions = save_superpixel_regions(img_array_255, segments, superpixel_regions_dir, image_name, max_regions)
|
||
print(f"💾 已保存 {len(saved_regions)} 個超像素區域影像到: {superpixel_regions_dir}")
|
||
|
||
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||
points = points.to(device)
|
||
N = points.shape[0]
|
||
|
||
print(f"🔄 處理圖像: {os.path.basename(image_path)} 在 {device} 上 (使用超像素分割)")
|
||
|
||
# Compute pairwise distances
|
||
dist = torch.cdist(points, points)
|
||
|
||
# Approximate dc if not provided (using a sample for efficiency)
|
||
if dc is None:
|
||
# Sample 1000 points to estimate dc
|
||
sample_size = min(1000, N)
|
||
sample_idx = torch.randperm(N)[:sample_size]
|
||
sample_dist = torch.cdist(points[sample_idx], points)
|
||
sample_dist_flat = sample_dist.flatten()
|
||
sample_dist_flat = sample_dist_flat[sample_dist_flat > 0] # Exclude zeros
|
||
sorted_dist = torch.sort(sample_dist_flat)[0]
|
||
pos = int(len(sorted_dist) * 0.02)
|
||
dc = sorted_dist[pos].item()
|
||
print(f"Approximated dc: {dc}")
|
||
|
||
# Compute local density rho
|
||
if use_gaussian:
|
||
rho = torch.exp(-(dist ** 2) / (2 * dc ** 2)).sum(dim=1) - 1
|
||
else:
|
||
rho = (dist < dc).float().sum(dim=1) - 1
|
||
|
||
# Compute delta
|
||
sorted_rho, sorted_idx = torch.sort(rho, descending=True)
|
||
delta = torch.full((N,), 0.0, device=device)
|
||
nn = torch.full((N,), -1, dtype=torch.long, device=device)
|
||
|
||
# For the highest density point
|
||
delta[sorted_idx[0]] = dist[sorted_idx[0]].max()
|
||
|
||
# For others
|
||
for i in range(1, N):
|
||
higher_idx = sorted_idx[:i]
|
||
cur_idx = sorted_idx[i]
|
||
dists_to_higher = dist[cur_idx, higher_idx]
|
||
min_dist_idx = torch.argmin(dists_to_higher)
|
||
delta[cur_idx] = dists_to_higher[min_dist_idx]
|
||
nn[cur_idx] = higher_idx[min_dist_idx]
|
||
|
||
# Calculate gamma (rho * delta) and n (index in descending order of gamma)
|
||
gamma = rho * delta
|
||
sorted_gamma_indices = torch.argsort(gamma, descending=True)
|
||
n = torch.empty_like(sorted_gamma_indices)
|
||
n[sorted_gamma_indices] = torch.arange(1, N + 1, device=device)
|
||
|
||
# Plot decision graph with two subplots
|
||
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
|
||
|
||
# Decision graph (rho vs delta)
|
||
rho_np = rho.cpu().numpy()
|
||
delta_np = delta.cpu().numpy()
|
||
|
||
# 使用顏色映射來顯示密度
|
||
scatter1 = ax1.scatter(rho_np, delta_np, c=gamma.cpu().numpy(), s=8, alpha=0.7, cmap='viridis')
|
||
ax1.set_xlabel('Rho (Local Density)', fontsize=12)
|
||
ax1.set_ylabel('Delta (Min Distance to Higher Density)', fontsize=12)
|
||
ax1.set_title(f'Decision Graph - {os.path.basename(image_path)}\n(Superpixels: {len(rho_np)})', fontsize=14)
|
||
ax1.grid(True, alpha=0.3)
|
||
|
||
# 添加顏色條
|
||
cbar1 = plt.colorbar(scatter1, ax=ax1)
|
||
cbar1.set_label('Gamma (Rho × Delta)', fontsize=10)
|
||
|
||
# Gamma vs n plot - with data validation for log scale
|
||
gamma_np = gamma.cpu().numpy()
|
||
n_np = n.cpu().numpy()
|
||
|
||
# Filter out non-positive values for log scale
|
||
positive_mask = gamma_np > 0
|
||
if positive_mask.sum() > 0:
|
||
scatter2 = ax2.scatter(n_np[positive_mask], gamma_np[positive_mask],
|
||
c=rho_np[positive_mask], s=8, alpha=0.7, cmap='plasma')
|
||
ax2.set_yscale('log')
|
||
|
||
# 添加顏色條
|
||
cbar2 = plt.colorbar(scatter2, ax=ax2)
|
||
cbar2.set_label('Rho (Local Density)', fontsize=10)
|
||
else:
|
||
# Fallback to linear scale if no positive values
|
||
ax2.scatter(n_np, gamma_np, s=8, alpha=0.7, color='blue')
|
||
print("⚠️ Warning: No positive gamma values found, using linear scale instead of log scale")
|
||
|
||
ax2.set_xlabel('N (Index in descending order of Gamma)', fontsize=12)
|
||
ax2.set_ylabel('Gamma (Rho × Delta)', fontsize=12)
|
||
ax2.set_title('Gamma vs N Plot\n(Cluster Center Selection)', fontsize=14)
|
||
ax2.grid(True, alpha=0.3)
|
||
|
||
# Apply tight_layout with error handling
|
||
try:
|
||
plt.tight_layout()
|
||
except Exception as e:
|
||
print(f"Warning: tight_layout failed ({e}), using default layout")
|
||
# Use manual spacing as fallback
|
||
plt.subplots_adjust(left=0.1, right=0.95, top=0.9, bottom=0.1, wspace=0.3)
|
||
|
||
file = Process_File()
|
||
file.JudgeRoot_MakeDir(Save_Root)
|
||
# 使用原始檔名而不是索引編號
|
||
original_filename = os.path.basename(image_path)
|
||
path = file.Make_Save_Root(original_filename, Save_Root)
|
||
|
||
plt.savefig(path, dpi=300, bbox_inches='tight')
|
||
plt.close()
|
||
print(f"Decision graph and gamma vs n plot saved to: {path}")
|
||
|
||
# Identify potential cluster centers (high rho and high delta)
|
||
mean_rho = rho.mean()
|
||
std_rho = rho.std()
|
||
threshold_rho = mean_rho + threshold_factor * std_rho
|
||
|
||
mean_delta = delta.mean()
|
||
std_delta = delta.std()
|
||
threshold_delta = mean_delta + threshold_factor * std_delta
|
||
|
||
is_center = (rho > threshold_rho) & (delta > threshold_delta)
|
||
|
||
# Properly handle torch.nonzero result to avoid issues with empty tensors
|
||
center_nonzero = torch.nonzero(is_center)
|
||
if center_nonzero.numel() > 0:
|
||
center_indices = center_nonzero.squeeze().cpu().numpy()
|
||
# Ensure center_indices is always a 1D array, even for single element
|
||
if center_indices.ndim == 0:
|
||
center_indices = np.array([center_indices.item()])
|
||
else:
|
||
center_indices = np.array([])
|
||
|
||
# 識別潛在的聚類中心(超像素)
|
||
center_points = superpixel_features[center_indices] if len(center_indices) > 0 else np.array([])
|
||
print(f"🎯 發現潛在聚類中心: {len(center_indices)} 個超像素")
|
||
for idx in center_indices:
|
||
print(f" 中心超像素 {idx}: RGB({superpixel_features[idx][0]:.3f}, {superpixel_features[idx][1]:.3f}, {superpixel_features[idx][2]:.3f})")
|
||
|
||
# ========== Fuzzy C-means 聚類分析 ==========
|
||
print(f"\n🔄 開始 Fuzzy C-means 聚類分析...")
|
||
|
||
# 確定最佳聚類數
|
||
gamma_np = gamma.cpu().numpy()
|
||
optimal_k, cluster_scores = determine_optimal_clusters(
|
||
superpixel_features, gamma_np, max_clusters=8, min_clusters=2
|
||
)
|
||
|
||
# 執行 Fuzzy C-means 聚類
|
||
print(f"\n🎯 使用最佳聚類數 {optimal_k} 進行 Fuzzy C-means 聚類...")
|
||
cluster_centers, membership_matrix, cluster_labels, objective_value, n_iterations = fuzzy_c_means(
|
||
superpixel_features, optimal_k, m=2.0, max_iter=100, random_state=42
|
||
)
|
||
|
||
# 計算聚類統計信息
|
||
cluster_stats = {}
|
||
for cluster_id in range(optimal_k):
|
||
cluster_mask = cluster_labels == cluster_id
|
||
cluster_size = np.sum(cluster_mask)
|
||
avg_membership = np.mean(membership_matrix[cluster_mask, cluster_id])
|
||
cluster_stats[cluster_id] = {
|
||
'size': cluster_size,
|
||
'avg_membership': avg_membership,
|
||
'center': cluster_centers[cluster_id]
|
||
}
|
||
print(f" 聚類 {cluster_id}: {cluster_size} 個超像素, 平均隸屬度: {avg_membership:.3f}")
|
||
|
||
# 生成聚類結果可視化
|
||
print(f"\n📊 生成聚類結果可視化...")
|
||
# 將圖像數組轉換回0-255範圍用於可視化
|
||
img_array_255 = (img_array * 255).astype(np.uint8)
|
||
|
||
# 創建可視化保存路徑
|
||
original_filename = os.path.basename(image_path)
|
||
clustering_viz_path = os.path.join(Save_Root, f"clustering_results_{original_filename}")
|
||
|
||
# 生成可視化
|
||
visualize_clustering_results(
|
||
img_array_255, segments, cluster_labels, cluster_centers,
|
||
membership_matrix, clustering_viz_path
|
||
)
|
||
|
||
# Prepare return dictionary (包含所有信息:密度峰值分析 + 模糊聚類結果)
|
||
result = {
|
||
# 原有的密度峰值分析結果
|
||
'center_indices': center_indices,
|
||
'center_points': center_points,
|
||
'rho': rho.cpu().numpy(),
|
||
'delta': delta.cpu().numpy(),
|
||
'gamma': gamma.cpu().numpy(),
|
||
'n': n.cpu().numpy(),
|
||
'segments': segments,
|
||
'superpixel_features': superpixel_features,
|
||
'superpixel_centroids': superpixel_centroids,
|
||
'n_superpixels': len(superpixel_features),
|
||
'compression_ratio': len(superpixel_features) / image_size,
|
||
|
||
# 新增的 Fuzzy C-means 聚類結果
|
||
'optimal_clusters': optimal_k,
|
||
'cluster_centers': cluster_centers,
|
||
'membership_matrix': membership_matrix,
|
||
'cluster_labels': cluster_labels,
|
||
'cluster_stats': cluster_stats,
|
||
'clustering_objective': objective_value,
|
||
'clustering_iterations': n_iterations,
|
||
'cluster_scores': cluster_scores,
|
||
'clustering_viz_path': clustering_viz_path
|
||
}
|
||
|
||
return result
|
||
|
||
# Example usage:
|
||
if __name__ == "__main__":
|
||
Label_Length = len(Loading_Config["Training_Labels"])
|
||
Merge = merge()
|
||
Prepare = Load_Data_Prepare()
|
||
|
||
load = Loding_Data_Root(Loading_Config["Training_Labels"], Loading_Config["Train_Data_Root"], Loading_Config["ImageGenerator_Data_Root"])
|
||
Data_Dict_Data = load.process_main(False)
|
||
|
||
Total_Size_List = []
|
||
for label in Loading_Config["Training_Labels"]:
|
||
Total_Size_List.append(len(Data_Dict_Data[label]))
|
||
|
||
# 做出跟資料相同數量的Label
|
||
Classes = []
|
||
i = 0
|
||
for encording in Loading_Config["Training_Labels"]:
|
||
Classes.append(make_label_list(Total_Size_List[i], encording))
|
||
i += 1
|
||
|
||
# 將資料做成Dict的資料型態
|
||
Prepare.Set_Final_Dict_Data(Loading_Config["Training_Labels"], Data_Dict_Data, Classes, Label_Length)
|
||
Final_Dict_Data = Prepare.Get_Final_Data_Dict()
|
||
keys = list(Final_Dict_Data.keys())
|
||
|
||
Training_Data = Merge.merge_all_image_data(Final_Dict_Data[keys[0]], Final_Dict_Data[keys[1]]) # 將訓練資料合併成一個list
|
||
for i in range(2, Label_Length):
|
||
Training_Data = Merge.merge_all_image_data(Training_Data, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list
|
||
|
||
Training_Label = Merge.merge_all_image_data(Final_Dict_Data[keys[Label_Length]], Final_Dict_Data[keys[Label_Length + 1]]) #將訓練資料的label合併成一個label的list
|
||
for i in range(Label_Length + 2, 2 * Label_Length):
|
||
Training_Label = Merge.merge_all_image_data(Training_Label, Final_Dict_Data[keys[i]]) # 將訓練資料合併成一個list
|
||
|
||
for i in range(len(Training_Data)):
|
||
print(f"\n{'='*60}")
|
||
print(f"🖼️ 處理圖像 {i+1}/{len(Training_Data)}: {Training_Data[i]}")
|
||
|
||
# 所有圖片都使用 Superpixel 分割進行 Density Peak 分析
|
||
print("🎯 使用 SLIC 超像素分割進行 Density Peak 分析")
|
||
try:
|
||
result_superpixels = compute_decision_graph(
|
||
Training_Data[i],
|
||
f'{Save_Result_File_Config["Density_Peak_Save_Root"]}/{Training_Label[i]}_superpixels',
|
||
use_superpixels=True,
|
||
save_regions=True,
|
||
max_regions=50
|
||
)
|
||
|
||
# 獲取圖片信息用於統計
|
||
test_image = Image.open(Training_Data[i])
|
||
image_size = test_image.size[0] * test_image.size[1]
|
||
|
||
print(f"\n✅ 超像素處理成功:")
|
||
print(f"📊 超像素數量: {len(result_superpixels['rho'])} 個數據點")
|
||
print(f"📈 壓縮比例: {len(result_superpixels['rho']) / image_size:.6f}")
|
||
print(f"🔄 壓縮倍數: {image_size / len(result_superpixels['rho']):.1f}x")
|
||
print(f"🎯 Decision-graph 已生成並保存")
|
||
|
||
# 顯示 Fuzzy C-means 聚類結果
|
||
print(f"\n🎯 Fuzzy C-means 聚類結果:")
|
||
print(f"📊 最佳聚類數: {result_superpixels['optimal_clusters']}")
|
||
print(f"🔄 聚類迭代次數: {result_superpixels['clustering_iterations']}")
|
||
print(f"📈 目標函數值: {result_superpixels['clustering_objective']:.4f}")
|
||
|
||
# 顯示各聚類的詳細信息
|
||
print(f"📋 各聚類詳細信息:")
|
||
for cluster_id, stats in result_superpixels['cluster_stats'].items():
|
||
print(f" 聚類 {cluster_id}: {stats['size']} 個超像素 (平均隸屬度: {stats['avg_membership']:.3f})")
|
||
center = stats['center']
|
||
print(f" 中心特徵: RGB({center[0]:.3f}, {center[1]:.3f}, {center[2]:.3f}), 位置({center[3]:.3f}, {center[4]:.3f})")
|
||
|
||
print(f"📊 聚類可視化已保存至: {result_superpixels['clustering_viz_path']}")
|
||
|
||
# 顯示聚類品質評估
|
||
if 'cluster_scores' in result_superpixels and result_superpixels['optimal_clusters'] in result_superpixels['cluster_scores']:
|
||
scores = result_superpixels['cluster_scores'][result_superpixels['optimal_clusters']]
|
||
print(f"📈 聚類品質評估:")
|
||
print(f" 分割係數 (PC): {scores['pc']:.3f}")
|
||
print(f" 分割熵 (PE): {scores['pe']:.3f}")
|
||
print(f" 修正分割係數 (MPC): {scores['mpc']:.3f}")
|
||
print(f" 綜合分數: {scores['combined_score']:.3f}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 超像素處理失敗: {e}")
|
||
continue
|
||
|
||
print(f"\n💾 結果保存至: {Save_Result_File_Config['Density_Peak_Save_Root']}")
|