424 lines
14 KiB
Python
424 lines
14 KiB
Python
"""Evolution strategy implementation for laboratory work #5."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
import os
|
||
import random
|
||
import shutil
|
||
import time
|
||
from collections import deque
|
||
from dataclasses import dataclass
|
||
from typing import Callable, Iterable, Literal, Sequence
|
||
|
||
import matplotlib.pyplot as plt
|
||
import numpy as np
|
||
from matplotlib.axes import Axes
|
||
from mpl_toolkits.mplot3d import Axes3D # noqa: F401 - required for 3D plotting
|
||
from numpy.typing import NDArray
|
||
|
||
Array = NDArray[np.float64]
|
||
FitnessFn = Callable[[Array], float]
|
||
|
||
|
||
@dataclass
|
||
class Individual:
|
||
"""Single individual of the evolution strategy population."""
|
||
|
||
x: Array
|
||
sigma: Array
|
||
fitness: float
|
||
|
||
def copy(self) -> "Individual":
|
||
return Individual(self.x.copy(), self.sigma.copy(), float(self.fitness))
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class Generation:
|
||
number: int
|
||
population: tuple[Individual, ...]
|
||
best: Individual
|
||
mean_fitness: float
|
||
sigma_scale: float
|
||
|
||
|
||
@dataclass
|
||
class EvolutionStrategyResult:
|
||
generations_count: int
|
||
best_generation: Generation
|
||
history: list[Generation]
|
||
time_ms: float
|
||
|
||
|
||
@dataclass
|
||
class EvolutionStrategyConfig:
|
||
fitness_func: FitnessFn
|
||
dimension: int
|
||
x_min: Array
|
||
x_max: Array
|
||
mu: int
|
||
lambda_: int
|
||
mutation_probability: float
|
||
initial_sigma: Array | float
|
||
max_generations: int
|
||
selection: Literal["plus", "comma"] = "comma"
|
||
recombination: Literal["intermediate", "discrete", "none"] = "intermediate"
|
||
parents_per_offspring: int = 2
|
||
success_rule_window: int = 10
|
||
success_rule_target: float = 0.2
|
||
sigma_increase: float = 1.22
|
||
sigma_decrease: float = 0.82
|
||
sigma_scale_min: float = 1e-3
|
||
sigma_scale_max: float = 100.0
|
||
tau: float | None = None
|
||
tau_prime: float | None = None
|
||
sigma_min: float = 1e-6
|
||
sigma_max: float = 10.0
|
||
best_value_threshold: float | None = None
|
||
max_stagnation_generations: int | None = None
|
||
save_generations: list[int] | None = None
|
||
results_dir: str = "results"
|
||
log_every_generation: bool = False
|
||
seed: int | None = None
|
||
|
||
def __post_init__(self) -> None:
|
||
assert self.dimension == self.x_min.shape[0] == self.x_max.shape[0], (
|
||
"Bounds dimensionality must match the dimension of the problem"
|
||
)
|
||
assert 0 < self.mu <= self.lambda_, "Require mu <= lambda and positive"
|
||
assert 0.0 < self.mutation_probability <= 1.0, (
|
||
"Mutation probability must be within (0, 1]"
|
||
)
|
||
if isinstance(self.initial_sigma, (int, float)):
|
||
if self.initial_sigma <= 0:
|
||
raise ValueError("Initial sigma must be positive")
|
||
else:
|
||
if self.initial_sigma.shape != (self.dimension,):
|
||
raise ValueError("initial_sigma must be scalar or an array of given dimension")
|
||
if np.any(self.initial_sigma <= 0):
|
||
raise ValueError("All sigma values must be positive")
|
||
|
||
if self.tau is None:
|
||
object.__setattr__(self, "tau", 1.0 / math.sqrt(2.0 * math.sqrt(self.dimension)))
|
||
if self.tau_prime is None:
|
||
object.__setattr__(self, "tau_prime", 1.0 / math.sqrt(2.0 * self.dimension))
|
||
|
||
def make_initial_sigma(self) -> Array:
|
||
if isinstance(self.initial_sigma, (int, float)):
|
||
return np.full(self.dimension, float(self.initial_sigma), dtype=np.float64)
|
||
return self.initial_sigma.astype(np.float64, copy=True)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helper utilities
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def clear_results_directory(results_dir: str) -> None:
|
||
if os.path.exists(results_dir):
|
||
shutil.rmtree(results_dir)
|
||
os.makedirs(results_dir, exist_ok=True)
|
||
|
||
|
||
def evaluate_population(population: Iterable[Individual], fitness_func: FitnessFn) -> None:
|
||
for individual in population:
|
||
individual.fitness = float(fitness_func(individual.x))
|
||
|
||
|
||
def recombine(
|
||
parents: Sequence[Individual],
|
||
config: EvolutionStrategyConfig,
|
||
) -> tuple[Array, Array, float]:
|
||
"""Recombine parent individuals before mutation.
|
||
|
||
Returns the base vector, sigma and the best parent fitness.
|
||
"""
|
||
if config.recombination == "none" or config.parents_per_offspring == 1:
|
||
parent = random.choice(parents)
|
||
return parent.x.copy(), parent.sigma.copy(), parent.fitness
|
||
|
||
selected = random.choices(parents, k=config.parents_per_offspring)
|
||
if config.recombination == "intermediate":
|
||
x = np.mean([p.x for p in selected], axis=0)
|
||
sigma = np.mean([p.sigma for p in selected], axis=0)
|
||
elif config.recombination == "discrete":
|
||
mask = np.random.randint(0, len(selected), size=config.dimension)
|
||
indices = np.arange(config.dimension)
|
||
x = np.array([selected[mask[i]].x[i] for i in indices], dtype=np.float64)
|
||
sigma = np.array([selected[mask[i]].sigma[i] for i in indices], dtype=np.float64)
|
||
else: # pragma: no cover - defensive
|
||
raise ValueError(f"Unsupported recombination type: {config.recombination}")
|
||
|
||
parent_fitness = min(p.fitness for p in selected)
|
||
return x, sigma, parent_fitness
|
||
|
||
|
||
def mutate(
|
||
x: Array,
|
||
sigma: Array,
|
||
config: EvolutionStrategyConfig,
|
||
sigma_scale: float,
|
||
) -> tuple[Array, Array]:
|
||
"""Apply log-normal mutation with optional per-coordinate masking."""
|
||
global_noise = np.random.normal()
|
||
coordinate_noise = np.random.normal(size=config.dimension)
|
||
sigma_new = sigma * np.exp(config.tau_prime * global_noise + config.tau * coordinate_noise)
|
||
sigma_new = np.clip(sigma_new, config.sigma_min, config.sigma_max)
|
||
sigma_new = np.clip(sigma_new * sigma_scale, config.sigma_min, config.sigma_max)
|
||
|
||
steps = np.random.normal(size=config.dimension) * sigma_new
|
||
|
||
if config.mutation_probability < 1.0:
|
||
mask = np.random.random(config.dimension) < config.mutation_probability
|
||
if not np.any(mask):
|
||
mask[np.random.randint(0, config.dimension)] = True
|
||
steps = steps * mask
|
||
sigma_new = np.where(mask, sigma_new, sigma)
|
||
|
||
x_new = np.clip(x + steps, config.x_min, config.x_max)
|
||
return x_new, sigma_new
|
||
|
||
|
||
def create_offspring(
|
||
parents: Sequence[Individual],
|
||
config: EvolutionStrategyConfig,
|
||
sigma_scale: float,
|
||
) -> tuple[list[Individual], list[bool]]:
|
||
offspring: list[Individual] = []
|
||
successes: list[bool] = []
|
||
|
||
for _ in range(config.lambda_):
|
||
base_x, base_sigma, best_parent_fitness = recombine(parents, config)
|
||
mutated_x, mutated_sigma = mutate(base_x, base_sigma, config, sigma_scale)
|
||
fitness = float(config.fitness_func(mutated_x))
|
||
child = Individual(mutated_x, mutated_sigma, fitness)
|
||
offspring.append(child)
|
||
successes.append(fitness < best_parent_fitness)
|
||
|
||
return offspring, successes
|
||
|
||
|
||
def select_next_generation(
|
||
parents: list[Individual],
|
||
offspring: list[Individual],
|
||
config: EvolutionStrategyConfig,
|
||
) -> list[Individual]:
|
||
if config.selection == "plus":
|
||
pool = parents + offspring
|
||
else:
|
||
pool = offspring
|
||
|
||
pool.sort(key=lambda ind: ind.fitness)
|
||
next_generation = [ind.copy() for ind in pool[: config.mu]]
|
||
return next_generation
|
||
|
||
|
||
def compute_best(population: Sequence[Individual]) -> Individual:
|
||
best = min(population, key=lambda ind: ind.fitness)
|
||
return best.copy()
|
||
|
||
|
||
def build_generation(
|
||
number: int,
|
||
population: list[Individual],
|
||
sigma_scale: float,
|
||
) -> Generation:
|
||
copies = tuple(ind.copy() for ind in population)
|
||
best = compute_best(copies)
|
||
mean_fitness = float(np.mean([ind.fitness for ind in copies]))
|
||
return Generation(number, copies, best, mean_fitness, sigma_scale)
|
||
|
||
|
||
def save_generation(generation: Generation, config: EvolutionStrategyConfig) -> None:
|
||
if config.dimension != 2:
|
||
raise ValueError("Visualization is only supported for 2D problems")
|
||
|
||
os.makedirs(config.results_dir, exist_ok=True)
|
||
|
||
fig = plt.figure(figsize=(21, 7))
|
||
fig.suptitle(
|
||
(
|
||
f"Поколение #{generation.number}. "
|
||
f"Лучшее значение: {generation.best.fitness:.6f}. "
|
||
f"Среднее: {generation.mean_fitness:.6f}. "
|
||
f"Масштаб σ: {generation.sigma_scale:.4f}"
|
||
),
|
||
fontsize=14,
|
||
y=0.88,
|
||
)
|
||
|
||
ax_contour = fig.add_subplot(1, 3, 1)
|
||
plot_fitness_contour(config.fitness_func, config.x_min, config.x_max, ax_contour)
|
||
arr = np.array([ind.x for ind in generation.population])
|
||
ax_contour.scatter(arr[:, 1], arr[:, 0], c="red", s=20, alpha=0.9)
|
||
ax_contour.scatter(
|
||
generation.best.x[1], generation.best.x[0], c="black", s=60, marker="*", label="Лучший"
|
||
)
|
||
ax_contour.legend(loc="upper right")
|
||
ax_contour.text(0.5, -0.25, "(a)", transform=ax_contour.transAxes, ha="center", fontsize=16)
|
||
|
||
views = [(50, -45), (60, 30)]
|
||
fitnesses = np.array([ind.fitness for ind in generation.population])
|
||
|
||
for idx, (elev, azim) in enumerate(views, start=1):
|
||
ax = fig.add_subplot(1, 3, idx + 1, projection="3d", computed_zorder=False)
|
||
plot_fitness_surface(config.fitness_func, config.x_min, config.x_max, ax)
|
||
ax.scatter(arr[:, 0], arr[:, 1], fitnesses, c="red", s=12, alpha=0.9)
|
||
ax.scatter(
|
||
generation.best.x[0],
|
||
generation.best.x[1],
|
||
generation.best.fitness,
|
||
c="black",
|
||
s=60,
|
||
marker="*",
|
||
)
|
||
ax.view_init(elev=elev, azim=azim)
|
||
label = chr(ord("a") + idx)
|
||
ax.text2D(0.5, -0.15, f"({label})", transform=ax.transAxes, ha="center", fontsize=16)
|
||
ax.set_xlabel("X₁")
|
||
ax.set_ylabel("X₂")
|
||
ax.set_zlabel("f(x)")
|
||
|
||
filename = os.path.join(config.results_dir, f"generation_{generation.number:03d}.png")
|
||
fig.savefig(filename, dpi=150, bbox_inches="tight")
|
||
plt.close(fig)
|
||
|
||
|
||
def plot_fitness_surface(
|
||
fitness_func: FitnessFn,
|
||
x_min: Array,
|
||
x_max: Array,
|
||
ax: Axes3D,
|
||
num_points: int = 100,
|
||
) -> None:
|
||
if x_min.shape != (2,) or x_max.shape != (2,):
|
||
raise ValueError("Surface plotting is only available for 2D functions")
|
||
|
||
xs = np.linspace(x_min[0], x_max[0], num_points)
|
||
ys = np.linspace(x_min[1], x_max[1], num_points)
|
||
X, Y = np.meshgrid(xs, ys)
|
||
vectorized = np.vectorize(lambda a, b: fitness_func(np.array([a, b])))
|
||
Z = vectorized(X, Y)
|
||
ax.plot_surface(X, Y, Z, cmap="viridis", edgecolor="none", alpha=0.7, shade=False)
|
||
|
||
|
||
def plot_fitness_contour(
|
||
fitness_func: FitnessFn,
|
||
x_min: Array,
|
||
x_max: Array,
|
||
ax: Axes,
|
||
num_points: int = 100,
|
||
) -> None:
|
||
xs = np.linspace(x_min[0], x_max[0], num_points)
|
||
ys = np.linspace(x_min[1], x_max[1], num_points)
|
||
X, Y = np.meshgrid(xs, ys)
|
||
vectorized = np.vectorize(lambda a, b: fitness_func(np.array([a, b])))
|
||
Z = vectorized(X, Y)
|
||
contour = ax.contourf(Y, X, Z, levels=25, cmap="viridis", alpha=0.8)
|
||
plt.colorbar(contour, ax=ax, shrink=0.6)
|
||
ax.set_aspect("equal")
|
||
ax.set_xlabel("X₂")
|
||
ax.set_ylabel("X₁")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main algorithm
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def run_evolution_strategy(config: EvolutionStrategyConfig) -> EvolutionStrategyResult:
|
||
if config.seed is not None:
|
||
random.seed(config.seed)
|
||
np.random.seed(config.seed)
|
||
|
||
if config.save_generations:
|
||
clear_results_directory(config.results_dir)
|
||
|
||
start = time.perf_counter()
|
||
|
||
parents = [
|
||
Individual(
|
||
np.random.uniform(config.x_min, config.x_max),
|
||
config.make_initial_sigma(),
|
||
0.0,
|
||
)
|
||
for _ in range(config.mu)
|
||
]
|
||
evaluate_population(parents, config.fitness_func)
|
||
|
||
sigma_scale = 1.0
|
||
success_window: deque[float] = deque()
|
||
history: list[Generation] = []
|
||
best_overall: Generation | None = None
|
||
stagnation_counter = 0
|
||
|
||
for generation_number in range(1, config.max_generations + 1):
|
||
current_generation = build_generation(generation_number, parents, sigma_scale)
|
||
history.append(current_generation)
|
||
|
||
if config.log_every_generation:
|
||
print(
|
||
f"Generation #{generation_number}: best={current_generation.best.fitness:.6f} "
|
||
f"mean={current_generation.mean_fitness:.6f}"
|
||
)
|
||
|
||
if (
|
||
best_overall is None
|
||
or current_generation.best.fitness < best_overall.best.fitness
|
||
):
|
||
best_overall = current_generation
|
||
stagnation_counter = 0
|
||
else:
|
||
stagnation_counter += 1
|
||
|
||
if (
|
||
config.best_value_threshold is not None
|
||
and current_generation.best.fitness <= config.best_value_threshold
|
||
):
|
||
break
|
||
|
||
if (
|
||
config.max_stagnation_generations is not None
|
||
and stagnation_counter >= config.max_stagnation_generations
|
||
):
|
||
break
|
||
|
||
offspring, successes = create_offspring(parents, config, sigma_scale)
|
||
success_ratio = sum(successes) / len(successes) if successes else 0.0
|
||
success_window.append(success_ratio)
|
||
|
||
if len(success_window) == config.success_rule_window:
|
||
average_success = sum(success_window) / len(success_window)
|
||
if average_success > config.success_rule_target:
|
||
sigma_scale = min(
|
||
sigma_scale * config.sigma_increase, config.sigma_scale_max
|
||
)
|
||
elif average_success < config.success_rule_target:
|
||
sigma_scale = max(
|
||
sigma_scale * config.sigma_decrease, config.sigma_scale_min
|
||
)
|
||
success_window.clear()
|
||
|
||
parents = select_next_generation(parents, offspring, config)
|
||
|
||
if config.save_generations and (
|
||
generation_number in config.save_generations
|
||
or generation_number == config.max_generations
|
||
):
|
||
save_generation(current_generation, config)
|
||
|
||
end = time.perf_counter()
|
||
|
||
assert best_overall is not None
|
||
|
||
# Сохраняем последнее поколение, если нужно
|
||
if config.save_generations and history:
|
||
last_number = history[-1].number
|
||
if last_number not in config.save_generations:
|
||
save_generation(history[-1], config)
|
||
|
||
return EvolutionStrategyResult(
|
||
generations_count=len(history),
|
||
best_generation=best_overall,
|
||
history=history,
|
||
time_ms=(end - start) * 1000.0,
|
||
)
|