Files
genetic-algorithms/lab5/es.py

424 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Evolution strategy implementation for laboratory work #5."""
from __future__ import annotations
import math
import os
import random
import shutil
import time
from collections import deque
from dataclasses import dataclass
from typing import Callable, Iterable, Literal, Sequence
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.axes import Axes
from mpl_toolkits.mplot3d import Axes3D # noqa: F401 - required for 3D plotting
from numpy.typing import NDArray
Array = NDArray[np.float64]
FitnessFn = Callable[[Array], float]
@dataclass
class Individual:
"""Single individual of the evolution strategy population."""
x: Array
sigma: Array
fitness: float
def copy(self) -> "Individual":
return Individual(self.x.copy(), self.sigma.copy(), float(self.fitness))
@dataclass(frozen=True)
class Generation:
number: int
population: tuple[Individual, ...]
best: Individual
mean_fitness: float
sigma_scale: float
@dataclass
class EvolutionStrategyResult:
generations_count: int
best_generation: Generation
history: list[Generation]
time_ms: float
@dataclass
class EvolutionStrategyConfig:
fitness_func: FitnessFn
dimension: int
x_min: Array
x_max: Array
mu: int
lambda_: int
mutation_probability: float
initial_sigma: Array | float
max_generations: int
selection: Literal["plus", "comma"] = "comma"
recombination: Literal["intermediate", "discrete", "none"] = "intermediate"
parents_per_offspring: int = 2
success_rule_window: int = 10
success_rule_target: float = 0.2
sigma_increase: float = 1.22
sigma_decrease: float = 0.82
sigma_scale_min: float = 1e-3
sigma_scale_max: float = 100.0
tau: float | None = None
tau_prime: float | None = None
sigma_min: float = 1e-6
sigma_max: float = 10.0
best_value_threshold: float | None = None
max_stagnation_generations: int | None = None
save_generations: list[int] | None = None
results_dir: str = "results"
log_every_generation: bool = False
seed: int | None = None
def __post_init__(self) -> None:
assert self.dimension == self.x_min.shape[0] == self.x_max.shape[0], (
"Bounds dimensionality must match the dimension of the problem"
)
assert 0 < self.mu <= self.lambda_, "Require mu <= lambda and positive"
assert 0.0 < self.mutation_probability <= 1.0, (
"Mutation probability must be within (0, 1]"
)
if isinstance(self.initial_sigma, (int, float)):
if self.initial_sigma <= 0:
raise ValueError("Initial sigma must be positive")
else:
if self.initial_sigma.shape != (self.dimension,):
raise ValueError("initial_sigma must be scalar or an array of given dimension")
if np.any(self.initial_sigma <= 0):
raise ValueError("All sigma values must be positive")
if self.tau is None:
object.__setattr__(self, "tau", 1.0 / math.sqrt(2.0 * math.sqrt(self.dimension)))
if self.tau_prime is None:
object.__setattr__(self, "tau_prime", 1.0 / math.sqrt(2.0 * self.dimension))
def make_initial_sigma(self) -> Array:
if isinstance(self.initial_sigma, (int, float)):
return np.full(self.dimension, float(self.initial_sigma), dtype=np.float64)
return self.initial_sigma.astype(np.float64, copy=True)
# ---------------------------------------------------------------------------
# Helper utilities
# ---------------------------------------------------------------------------
def clear_results_directory(results_dir: str) -> None:
if os.path.exists(results_dir):
shutil.rmtree(results_dir)
os.makedirs(results_dir, exist_ok=True)
def evaluate_population(population: Iterable[Individual], fitness_func: FitnessFn) -> None:
for individual in population:
individual.fitness = float(fitness_func(individual.x))
def recombine(
parents: Sequence[Individual],
config: EvolutionStrategyConfig,
) -> tuple[Array, Array, float]:
"""Recombine parent individuals before mutation.
Returns the base vector, sigma and the best parent fitness.
"""
if config.recombination == "none" or config.parents_per_offspring == 1:
parent = random.choice(parents)
return parent.x.copy(), parent.sigma.copy(), parent.fitness
selected = random.choices(parents, k=config.parents_per_offspring)
if config.recombination == "intermediate":
x = np.mean([p.x for p in selected], axis=0)
sigma = np.mean([p.sigma for p in selected], axis=0)
elif config.recombination == "discrete":
mask = np.random.randint(0, len(selected), size=config.dimension)
indices = np.arange(config.dimension)
x = np.array([selected[mask[i]].x[i] for i in indices], dtype=np.float64)
sigma = np.array([selected[mask[i]].sigma[i] for i in indices], dtype=np.float64)
else: # pragma: no cover - defensive
raise ValueError(f"Unsupported recombination type: {config.recombination}")
parent_fitness = min(p.fitness for p in selected)
return x, sigma, parent_fitness
def mutate(
x: Array,
sigma: Array,
config: EvolutionStrategyConfig,
sigma_scale: float,
) -> tuple[Array, Array]:
"""Apply log-normal mutation with optional per-coordinate masking."""
global_noise = np.random.normal()
coordinate_noise = np.random.normal(size=config.dimension)
sigma_new = sigma * np.exp(config.tau_prime * global_noise + config.tau * coordinate_noise)
sigma_new = np.clip(sigma_new, config.sigma_min, config.sigma_max)
sigma_new = np.clip(sigma_new * sigma_scale, config.sigma_min, config.sigma_max)
steps = np.random.normal(size=config.dimension) * sigma_new
if config.mutation_probability < 1.0:
mask = np.random.random(config.dimension) < config.mutation_probability
if not np.any(mask):
mask[np.random.randint(0, config.dimension)] = True
steps = steps * mask
sigma_new = np.where(mask, sigma_new, sigma)
x_new = np.clip(x + steps, config.x_min, config.x_max)
return x_new, sigma_new
def create_offspring(
parents: Sequence[Individual],
config: EvolutionStrategyConfig,
sigma_scale: float,
) -> tuple[list[Individual], list[bool]]:
offspring: list[Individual] = []
successes: list[bool] = []
for _ in range(config.lambda_):
base_x, base_sigma, best_parent_fitness = recombine(parents, config)
mutated_x, mutated_sigma = mutate(base_x, base_sigma, config, sigma_scale)
fitness = float(config.fitness_func(mutated_x))
child = Individual(mutated_x, mutated_sigma, fitness)
offspring.append(child)
successes.append(fitness < best_parent_fitness)
return offspring, successes
def select_next_generation(
parents: list[Individual],
offspring: list[Individual],
config: EvolutionStrategyConfig,
) -> list[Individual]:
if config.selection == "plus":
pool = parents + offspring
else:
pool = offspring
pool.sort(key=lambda ind: ind.fitness)
next_generation = [ind.copy() for ind in pool[: config.mu]]
return next_generation
def compute_best(population: Sequence[Individual]) -> Individual:
best = min(population, key=lambda ind: ind.fitness)
return best.copy()
def build_generation(
number: int,
population: list[Individual],
sigma_scale: float,
) -> Generation:
copies = tuple(ind.copy() for ind in population)
best = compute_best(copies)
mean_fitness = float(np.mean([ind.fitness for ind in copies]))
return Generation(number, copies, best, mean_fitness, sigma_scale)
def save_generation(generation: Generation, config: EvolutionStrategyConfig) -> None:
if config.dimension != 2:
raise ValueError("Visualization is only supported for 2D problems")
os.makedirs(config.results_dir, exist_ok=True)
fig = plt.figure(figsize=(21, 7))
fig.suptitle(
(
f"Поколение #{generation.number}. "
f"Лучшее значение: {generation.best.fitness:.6f}. "
f"Среднее: {generation.mean_fitness:.6f}. "
f"Масштаб σ: {generation.sigma_scale:.4f}"
),
fontsize=14,
y=0.88,
)
ax_contour = fig.add_subplot(1, 3, 1)
plot_fitness_contour(config.fitness_func, config.x_min, config.x_max, ax_contour)
arr = np.array([ind.x for ind in generation.population])
ax_contour.scatter(arr[:, 1], arr[:, 0], c="red", s=20, alpha=0.9)
ax_contour.scatter(
generation.best.x[1], generation.best.x[0], c="black", s=60, marker="*", label="Лучший"
)
ax_contour.legend(loc="upper right")
ax_contour.text(0.5, -0.25, "(a)", transform=ax_contour.transAxes, ha="center", fontsize=16)
views = [(50, -45), (60, 30)]
fitnesses = np.array([ind.fitness for ind in generation.population])
for idx, (elev, azim) in enumerate(views, start=1):
ax = fig.add_subplot(1, 3, idx + 1, projection="3d", computed_zorder=False)
plot_fitness_surface(config.fitness_func, config.x_min, config.x_max, ax)
ax.scatter(arr[:, 0], arr[:, 1], fitnesses, c="red", s=12, alpha=0.9)
ax.scatter(
generation.best.x[0],
generation.best.x[1],
generation.best.fitness,
c="black",
s=60,
marker="*",
)
ax.view_init(elev=elev, azim=azim)
label = chr(ord("a") + idx)
ax.text2D(0.5, -0.15, f"({label})", transform=ax.transAxes, ha="center", fontsize=16)
ax.set_xlabel("X₁")
ax.set_ylabel("X₂")
ax.set_zlabel("f(x)")
filename = os.path.join(config.results_dir, f"generation_{generation.number:03d}.png")
fig.savefig(filename, dpi=150, bbox_inches="tight")
plt.close(fig)
def plot_fitness_surface(
fitness_func: FitnessFn,
x_min: Array,
x_max: Array,
ax: Axes3D,
num_points: int = 100,
) -> None:
if x_min.shape != (2,) or x_max.shape != (2,):
raise ValueError("Surface plotting is only available for 2D functions")
xs = np.linspace(x_min[0], x_max[0], num_points)
ys = np.linspace(x_min[1], x_max[1], num_points)
X, Y = np.meshgrid(xs, ys)
vectorized = np.vectorize(lambda a, b: fitness_func(np.array([a, b])))
Z = vectorized(X, Y)
ax.plot_surface(X, Y, Z, cmap="viridis", edgecolor="none", alpha=0.7, shade=False)
def plot_fitness_contour(
fitness_func: FitnessFn,
x_min: Array,
x_max: Array,
ax: Axes,
num_points: int = 100,
) -> None:
xs = np.linspace(x_min[0], x_max[0], num_points)
ys = np.linspace(x_min[1], x_max[1], num_points)
X, Y = np.meshgrid(xs, ys)
vectorized = np.vectorize(lambda a, b: fitness_func(np.array([a, b])))
Z = vectorized(X, Y)
contour = ax.contourf(Y, X, Z, levels=25, cmap="viridis", alpha=0.8)
plt.colorbar(contour, ax=ax, shrink=0.6)
ax.set_aspect("equal")
ax.set_xlabel("X₂")
ax.set_ylabel("X₁")
# ---------------------------------------------------------------------------
# Main algorithm
# ---------------------------------------------------------------------------
def run_evolution_strategy(config: EvolutionStrategyConfig) -> EvolutionStrategyResult:
if config.seed is not None:
random.seed(config.seed)
np.random.seed(config.seed)
if config.save_generations:
clear_results_directory(config.results_dir)
start = time.perf_counter()
parents = [
Individual(
np.random.uniform(config.x_min, config.x_max),
config.make_initial_sigma(),
0.0,
)
for _ in range(config.mu)
]
evaluate_population(parents, config.fitness_func)
sigma_scale = 1.0
success_window: deque[float] = deque()
history: list[Generation] = []
best_overall: Generation | None = None
stagnation_counter = 0
for generation_number in range(1, config.max_generations + 1):
current_generation = build_generation(generation_number, parents, sigma_scale)
history.append(current_generation)
if config.log_every_generation:
print(
f"Generation #{generation_number}: best={current_generation.best.fitness:.6f} "
f"mean={current_generation.mean_fitness:.6f}"
)
if (
best_overall is None
or current_generation.best.fitness < best_overall.best.fitness
):
best_overall = current_generation
stagnation_counter = 0
else:
stagnation_counter += 1
if (
config.best_value_threshold is not None
and current_generation.best.fitness <= config.best_value_threshold
):
break
if (
config.max_stagnation_generations is not None
and stagnation_counter >= config.max_stagnation_generations
):
break
offspring, successes = create_offspring(parents, config, sigma_scale)
success_ratio = sum(successes) / len(successes) if successes else 0.0
success_window.append(success_ratio)
if len(success_window) == config.success_rule_window:
average_success = sum(success_window) / len(success_window)
if average_success > config.success_rule_target:
sigma_scale = min(
sigma_scale * config.sigma_increase, config.sigma_scale_max
)
elif average_success < config.success_rule_target:
sigma_scale = max(
sigma_scale * config.sigma_decrease, config.sigma_scale_min
)
success_window.clear()
parents = select_next_generation(parents, offspring, config)
if config.save_generations and (
generation_number in config.save_generations
or generation_number == config.max_generations
):
save_generation(current_generation, config)
end = time.perf_counter()
assert best_overall is not None
# Сохраняем последнее поколение, если нужно
if config.save_generations and history:
last_number = history[-1].number
if last_number not in config.save_generations:
save_generation(history[-1], config)
return EvolutionStrategyResult(
generations_count=len(history),
best_generation=best_overall,
history=history,
time_ms=(end - start) * 1000.0,
)