lab2 предварительная версия
This commit is contained in:
422
lab2/gen.py
Normal file
422
lab2/gen.py
Normal file
@@ -0,0 +1,422 @@
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable
|
||||
|
||||
import numpy as np
|
||||
import plotly.graph_objects as go
|
||||
from matplotlib import pyplot as plt
|
||||
from matplotlib.axes import Axes
|
||||
from mpl_toolkits.mplot3d import Axes3D
|
||||
from numpy.typing import NDArray
|
||||
|
||||
type Chromosome = NDArray[np.float64]
|
||||
type Population = list[Chromosome]
|
||||
type Fitnesses = NDArray[np.float64]
|
||||
type FitnessFn = Callable[[Chromosome], Fitnesses]
|
||||
type CrossoverFn = Callable[[Chromosome, Chromosome], tuple[Chromosome, Chromosome]]
|
||||
type MutationFn = Callable[[Chromosome], Chromosome]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GARunConfig:
|
||||
x_min: Chromosome
|
||||
x_max: Chromosome
|
||||
fitness_func: FitnessFn
|
||||
pop_size: int # размер популяции
|
||||
pc: float # вероятность кроссинговера
|
||||
pm: float # вероятность мутации
|
||||
max_generations: int # максимальное количество поколений
|
||||
seed: int | None = None # seed для генератора случайных чисел
|
||||
minimize: bool = False # если True, ищем минимум вместо максимума
|
||||
save_generations: list[int] | None = (
|
||||
None # индексы поколений для сохранения графиков
|
||||
)
|
||||
results_dir: str = "results" # папка для сохранения графиков
|
||||
fitness_avg_threshold: float | None = (
|
||||
None # порог среднего значения фитнес функции для остановки
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Generation:
|
||||
number: int
|
||||
best: Chromosome
|
||||
best_fitness: float
|
||||
population: Population
|
||||
fitnesses: Fitnesses
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GARunResult:
|
||||
generations_count: int
|
||||
best_generation: Generation
|
||||
history: list[Generation]
|
||||
time_ms: float
|
||||
|
||||
|
||||
def initialize_population(
|
||||
pop_size: int, x_min: Chromosome, x_max: Chromosome
|
||||
) -> Population:
|
||||
"""Инициализирует популяцию случайными векторами из заданного диапазона."""
|
||||
return [np.random.uniform(x_min, x_max, x_min.shape) for _ in range(pop_size)]
|
||||
|
||||
|
||||
def reproduction(population: Population, fitnesses: Fitnesses) -> Population:
|
||||
"""Репродукция (селекция) методом рулетки.
|
||||
|
||||
Чем больше значение фитнеса, тем больше вероятность выбора особи. Для минимизации
|
||||
значения фитнеса нужно предварительно инвертировать.
|
||||
"""
|
||||
# Чтобы работать с отрицательными f, сдвигаем значения фитнес функции на минимальное
|
||||
# значение в популяции. Вычитаем min_fit, т. к. min_fit может быть отрицательным.
|
||||
min_fit = np.min(fitnesses)
|
||||
shifted_fitnesses = fitnesses - min_fit + 1e-12
|
||||
|
||||
# Получаем вероятности для каждой особи
|
||||
probs = shifted_fitnesses / np.sum(shifted_fitnesses)
|
||||
cum = np.cumsum(probs)
|
||||
|
||||
# Выбираем особей методом рулетки
|
||||
selected = []
|
||||
for _ in population:
|
||||
r = np.random.random()
|
||||
idx = int(np.searchsorted(cum, r, side="left"))
|
||||
selected.append(population[idx])
|
||||
|
||||
return selected
|
||||
|
||||
|
||||
def arithmetical_crossover_fn(
|
||||
p1: Chromosome, p2: Chromosome, w: float = 0.5
|
||||
) -> tuple[Chromosome, Chromosome]:
|
||||
"""Арифметический кроссинговер."""
|
||||
h1 = w * p1 + (1 - w) * p2
|
||||
h2 = (1 - w) * p1 + w * p2
|
||||
return h1, h2
|
||||
|
||||
|
||||
def geometrical_crossover_fn(
|
||||
p1: Chromosome, p2: Chromosome, w: float = 0.5
|
||||
) -> tuple[Chromosome, Chromosome]:
|
||||
"""Геометрический кроссинговер."""
|
||||
h1 = np.power(p1, w) * np.power(p2, 1 - w)
|
||||
h2 = np.power(p2, w) * np.power(p1, 1 - w)
|
||||
return h1, h2
|
||||
|
||||
|
||||
def crossover(
|
||||
population: Population,
|
||||
pc: float,
|
||||
crossover_fn: CrossoverFn,
|
||||
) -> Population:
|
||||
"""Оператор кроссинговера (скрещивания) выполняется с заданной вероятностью pc.
|
||||
|
||||
Две хромосомы (родители) выбираются случайно из промежуточной популяции.
|
||||
|
||||
Если популяция нечетного размера, то последняя хромосома скрещивается со случайной
|
||||
другой хромосомой из популяции. В таком случае одна из хромосом может поучаствовать
|
||||
в кроссовере дважды.
|
||||
"""
|
||||
# Создаем копию популяции и перемешиваем её для случайного выбора пар
|
||||
shuffled_population = population.copy()
|
||||
np.random.shuffle(shuffled_population)
|
||||
|
||||
next_population = []
|
||||
pop_size = len(shuffled_population)
|
||||
|
||||
for i in range(0, pop_size, 2):
|
||||
p1 = shuffled_population[i]
|
||||
p2 = shuffled_population[(i + 1) % pop_size]
|
||||
if np.random.random() <= pc:
|
||||
p1, p2 = crossover_fn(p1, p2)
|
||||
next_population.append(p1)
|
||||
next_population.append(p2)
|
||||
|
||||
return next_population[:pop_size]
|
||||
|
||||
|
||||
def build_random_mutation_fn(x_min: Chromosome, x_max: Chromosome) -> MutationFn:
|
||||
"""Создаёт функцию случайной мутации."""
|
||||
|
||||
def mutation_fn(chrom: Chromosome) -> Chromosome:
|
||||
chrom_new = chrom.copy()
|
||||
k = np.random.randint(0, chrom_new.shape[0])
|
||||
chrom_new[k] = np.random.uniform(x_min[k], x_max[k])
|
||||
return chrom_new
|
||||
|
||||
return mutation_fn
|
||||
|
||||
|
||||
def mutation(population: Population, pm: float, mutation_fn: MutationFn) -> Population:
|
||||
"""Мутация происходит с вероятностью pm."""
|
||||
next_population = []
|
||||
for chrom in population:
|
||||
next_population.append(
|
||||
mutation_fn(chrom) if np.random.random() <= pm else chrom
|
||||
)
|
||||
return next_population
|
||||
|
||||
|
||||
def clear_results_directory(results_dir: str) -> None:
|
||||
"""Очищает папку с результатами перед началом эксперимента."""
|
||||
if os.path.exists(results_dir):
|
||||
shutil.rmtree(results_dir)
|
||||
os.makedirs(results_dir, exist_ok=True)
|
||||
|
||||
|
||||
def eval_population(population: Population, fitness_func: FitnessFn) -> Fitnesses:
|
||||
return np.array([fitness_func(chrom) for chrom in population])
|
||||
|
||||
|
||||
def plot_fitness_surface(
|
||||
fitness_func: FitnessFn,
|
||||
x_min: Chromosome,
|
||||
x_max: Chromosome,
|
||||
ax: Axes3D,
|
||||
num_points: int = 100,
|
||||
) -> None:
|
||||
"""Рисует поверхность функции фитнеса в 3D."""
|
||||
assert (
|
||||
x_min.shape == x_max.shape == (2,)
|
||||
), "Рисовать графики можно только для функции от двух переменных"
|
||||
X = np.linspace(x_min[0], x_max[0], num_points)
|
||||
Y = np.linspace(x_min[1], x_max[1], num_points)
|
||||
X, Y = np.meshgrid(X, Y)
|
||||
|
||||
vectorized_fitness = np.vectorize(lambda x, y: fitness_func(np.array([x, y])))
|
||||
Z = vectorized_fitness(X, Y)
|
||||
|
||||
return ax.plot_surface(
|
||||
X, Y, Z, cmap="viridis", edgecolor="none", alpha=0.7, shade=False
|
||||
)
|
||||
|
||||
|
||||
def plot_fitness_contour(
|
||||
fitness_func: FitnessFn,
|
||||
x_min: Chromosome,
|
||||
x_max: Chromosome,
|
||||
ax: Axes,
|
||||
num_points: int = 100,
|
||||
) -> None:
|
||||
"""Рисует контурный график функции фитнеса в 2D."""
|
||||
X = np.linspace(x_min[0], x_max[0], num_points)
|
||||
Y = np.linspace(x_min[1], x_max[1], num_points)
|
||||
X, Y = np.meshgrid(X, Y)
|
||||
|
||||
vectorized_fitness = np.vectorize(lambda x, y: fitness_func(np.array([x, y])))
|
||||
Z = vectorized_fitness(X, Y)
|
||||
|
||||
# Рисуем контуры
|
||||
# X и Y поменяны местами для единообразия с 3D графиками, там ось Y изображена
|
||||
# горизонтально из-за особенностей функции в моём варианте
|
||||
contourf = ax.contourf(Y, X, Z, levels=20, cmap="viridis", alpha=0.7)
|
||||
ax.set_xlabel("Y")
|
||||
ax.set_ylabel("X")
|
||||
|
||||
# Добавляем цветовую шкалу
|
||||
plt.colorbar(contourf, ax=ax, shrink=0.5)
|
||||
|
||||
# По умолчанию matplotlib пытается растянуть график по оси Y, тут мы это отключаем
|
||||
ax.set_aspect("equal")
|
||||
|
||||
|
||||
def save_generation(
|
||||
generation: Generation, history: list[Generation], config: GARunConfig
|
||||
) -> None:
|
||||
"""Сохраняем графики поколения.
|
||||
|
||||
Функция не самая универсальная, тут есть хардкод, однако для большинства вариантов
|
||||
должна работать и так.
|
||||
"""
|
||||
assert (
|
||||
config.x_min.shape == config.x_max.shape == (2,)
|
||||
), "Рисовать графики можно только для функции от двух переменных"
|
||||
os.makedirs(config.results_dir, exist_ok=True)
|
||||
|
||||
fig = plt.figure(figsize=(21, 7))
|
||||
fig.suptitle(
|
||||
f"Поколение #{generation.number}. "
|
||||
f"Лучшая особь: {generation.best_fitness:.4f}. "
|
||||
f"Среднее значение: {np.mean(generation.fitnesses):.4f}",
|
||||
fontsize=14,
|
||||
y=0.85,
|
||||
)
|
||||
|
||||
# Контурный график (как вид сверху)
|
||||
ax1 = fig.add_subplot(1, 3, 1)
|
||||
plot_fitness_contour(config.fitness_func, config.x_min, config.x_max, ax1)
|
||||
|
||||
# Популяция на контурном графике
|
||||
arr = np.array(generation.population)
|
||||
# Координаты специально поменяны местами (см. plot_fitness_contour)
|
||||
ax1.scatter(
|
||||
arr[:, 1],
|
||||
arr[:, 0],
|
||||
c="red",
|
||||
marker="o",
|
||||
alpha=0.9,
|
||||
s=20,
|
||||
)
|
||||
|
||||
# Подпись под первым графиком
|
||||
ax1.text(
|
||||
0.5,
|
||||
-0.3,
|
||||
"(a)",
|
||||
transform=ax1.transAxes,
|
||||
ha="center",
|
||||
fontsize=16,
|
||||
)
|
||||
|
||||
# 3D графики с разных ракурсов
|
||||
views_3d = [
|
||||
# (elev, azim)
|
||||
(50, 0),
|
||||
(50, 15),
|
||||
]
|
||||
|
||||
for i, (elev, azim) in enumerate(views_3d):
|
||||
ax = fig.add_subplot(1, 3, i + 2, projection="3d", computed_zorder=False)
|
||||
|
||||
plot_fitness_surface(config.fitness_func, config.x_min, config.x_max, ax)
|
||||
|
||||
ax.set_xlabel("X")
|
||||
ax.set_ylabel("Y")
|
||||
ax.set_zlabel("f(X, Y)")
|
||||
|
||||
ax.scatter(
|
||||
arr[:, 0],
|
||||
arr[:, 1],
|
||||
generation.fitnesses + 1, # type: ignore
|
||||
c="red",
|
||||
s=10,
|
||||
marker="o",
|
||||
alpha=0.9,
|
||||
)
|
||||
|
||||
# Устанавливаем угол обзора
|
||||
ax.view_init(elev=elev, azim=azim)
|
||||
|
||||
# Подпись под 3D графиками
|
||||
label = chr(ord("b") + i) # 'b' для i=0, 'c' для i=1
|
||||
ax.text2D(
|
||||
0.5,
|
||||
-0.15,
|
||||
f"({label})",
|
||||
transform=ax.transAxes,
|
||||
ha="center",
|
||||
fontsize=16,
|
||||
)
|
||||
|
||||
filename = f"generation_{generation.number:03d}.png"
|
||||
path_png = os.path.join(config.results_dir, filename)
|
||||
fig.savefig(path_png, dpi=150, bbox_inches="tight")
|
||||
# Можно раскомментировать, чтобы подобрать более удачные ракурсы
|
||||
# в интерактивном режиме
|
||||
# fig.show()
|
||||
# plt.pause(1000)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def genetic_algorithm(config: GARunConfig) -> GARunResult:
|
||||
if config.seed is not None:
|
||||
random.seed(config.seed)
|
||||
np.random.seed(config.seed)
|
||||
|
||||
if config.save_generations:
|
||||
clear_results_directory(config.results_dir)
|
||||
|
||||
population = initialize_population(config.pop_size, config.x_min, config.x_max)
|
||||
|
||||
start = time.perf_counter()
|
||||
history: list[Generation] = []
|
||||
best: Generation | None = None
|
||||
|
||||
generation_number = 1
|
||||
|
||||
while True:
|
||||
# Вычисляем фитнес для всех особей в популяции
|
||||
fitnesses = eval_population(population, config.fitness_func)
|
||||
|
||||
# Находим лучшую особь в поколении
|
||||
best_index = (
|
||||
int(np.argmin(fitnesses)) if config.minimize else int(np.argmax(fitnesses))
|
||||
)
|
||||
|
||||
# Добавляем эпоху в историю
|
||||
current = Generation(
|
||||
number=generation_number,
|
||||
best=population[best_index],
|
||||
best_fitness=fitnesses[best_index],
|
||||
population=deepcopy(population),
|
||||
fitnesses=deepcopy(fitnesses),
|
||||
)
|
||||
history.append(current)
|
||||
|
||||
# Обновляем лучшую эпоху
|
||||
if (
|
||||
best is None
|
||||
or (config.minimize and current.best_fitness < best.best_fitness)
|
||||
or (not config.minimize and current.best_fitness > best.best_fitness)
|
||||
):
|
||||
best = current
|
||||
|
||||
# Проверка критериев остановки
|
||||
stop_algorithm = False
|
||||
|
||||
if generation_number >= config.max_generations:
|
||||
stop_algorithm = True
|
||||
|
||||
# if config.variance_threshold is not None:
|
||||
# fitness_variance = np.var(fitnesses)
|
||||
# if fitness_variance < config.variance_threshold:
|
||||
# stop_algorithm = True
|
||||
|
||||
if config.fitness_avg_threshold is not None:
|
||||
mean_fitness = np.mean(fitnesses)
|
||||
if (config.minimize and mean_fitness < config.fitness_avg_threshold) or (
|
||||
not config.minimize and mean_fitness > config.fitness_avg_threshold
|
||||
):
|
||||
stop_algorithm = True
|
||||
|
||||
# Сохраняем указанные поколения и последнее поколение
|
||||
if config.save_generations and (
|
||||
stop_algorithm or generation_number in config.save_generations
|
||||
):
|
||||
# save_generation(current, history, config)
|
||||
save_generation(current, history, config)
|
||||
|
||||
if stop_algorithm:
|
||||
break
|
||||
|
||||
# селекция (для минимума инвертируем знак)
|
||||
parents = reproduction(
|
||||
population, fitnesses if not config.minimize else -fitnesses
|
||||
)
|
||||
|
||||
# кроссинговер попарно
|
||||
next_population = crossover(parents, config.pc, arithmetical_crossover_fn)
|
||||
|
||||
# мутация
|
||||
next_population = mutation(
|
||||
next_population,
|
||||
config.pm,
|
||||
build_random_mutation_fn(config.x_min, config.x_max),
|
||||
)
|
||||
|
||||
population = next_population[: config.pop_size]
|
||||
generation_number += 1
|
||||
|
||||
end = time.perf_counter()
|
||||
|
||||
assert best is not None, "Best was never set"
|
||||
return GARunResult(
|
||||
len(history),
|
||||
best,
|
||||
history,
|
||||
(end - start) * 1000.0,
|
||||
)
|
||||
Reference in New Issue
Block a user