import os import random import shutil import time from copy import deepcopy from dataclasses import dataclass from typing import Callable import numpy as np import plotly.graph_objects as go from matplotlib import pyplot as plt from matplotlib.axes import Axes from mpl_toolkits.mplot3d import Axes3D from numpy.typing import NDArray type Chromosome = NDArray[np.float64] type Population = list[Chromosome] type Fitnesses = NDArray[np.float64] type FitnessFn = Callable[[Chromosome], Fitnesses] type CrossoverFn = Callable[[Chromosome, Chromosome], tuple[Chromosome, Chromosome]] type MutationFn = Callable[[Chromosome], Chromosome] @dataclass class GARunConfig: x_min: Chromosome x_max: Chromosome fitness_func: FitnessFn pop_size: int # размер популяции pc: float # вероятность кроссинговера pm: float # вероятность мутации max_generations: int # максимальное количество поколений max_best_repetitions: int | None = ( None # остановка при повторении лучшего результата ) seed: int | None = None # seed для генератора случайных чисел minimize: bool = False # если True, ищем минимум вместо максимума save_generations: list[int] | None = ( None # индексы поколений для сохранения графиков ) results_dir: str = "results" # папка для сохранения графиков fitness_avg_threshold: float | None = ( None # порог среднего значения фитнес функции для остановки ) best_value_threshold: float | None = ( None # остановка при достижении значения фитнеса лучше заданного ) log_every_generation: bool = False # логировать каждое поколение @dataclass(frozen=True) class Generation: number: int best: Chromosome best_fitness: float population: Population fitnesses: Fitnesses @dataclass(frozen=True) class GARunResult: generations_count: int best_generation: Generation history: list[Generation] time_ms: float def initialize_population( pop_size: int, x_min: Chromosome, x_max: Chromosome ) -> Population: """Инициализирует популяцию случайными векторами из заданного диапазона.""" return [np.random.uniform(x_min, x_max, x_min.shape) for _ in range(pop_size)] def reproduction(population: Population, fitnesses: Fitnesses) -> Population: """Репродукция (селекция) методом рулетки. Чем больше значение фитнеса, тем больше вероятность выбора особи. Для минимизации значения фитнеса нужно предварительно инвертировать. """ # Чтобы работать с отрицательными f, сдвигаем значения фитнес функции на минимальное # значение в популяции. Вычитаем min_fit, т. к. min_fit может быть отрицательным. min_fit = np.min(fitnesses) shifted_fitnesses = fitnesses - min_fit + 1e-12 # Получаем вероятности для каждой особи probs = shifted_fitnesses / np.sum(shifted_fitnesses) cum = np.cumsum(probs) # Выбираем особей методом рулетки selected = [] for _ in population: r = np.random.random() idx = int(np.searchsorted(cum, r, side="left")) selected.append(population[idx]) return selected def arithmetical_crossover_fn( p1: Chromosome, p2: Chromosome, w: float = 0.5 ) -> tuple[Chromosome, Chromosome]: """Арифметический кроссинговер.""" h1 = w * p1 + (1 - w) * p2 h2 = (1 - w) * p1 + w * p2 return h1, h2 def geometrical_crossover_fn( p1: Chromosome, p2: Chromosome, w: float = 0.5 ) -> tuple[Chromosome, Chromosome]: """Геометрический кроссинговер.""" h1 = np.power(p1, w) * np.power(p2, 1 - w) h2 = np.power(p2, w) * np.power(p1, 1 - w) return h1, h2 def crossover( population: Population, pc: float, crossover_fn: CrossoverFn, ) -> Population: """Оператор кроссинговера (скрещивания) выполняется с заданной вероятностью pc. Две хромосомы (родители) выбираются случайно из промежуточной популяции. Если популяция нечетного размера, то последняя хромосома скрещивается со случайной другой хромосомой из популяции. В таком случае одна из хромосом может поучаствовать в кроссовере дважды. """ # Создаем копию популяции и перемешиваем её для случайного выбора пар shuffled_population = population.copy() np.random.shuffle(shuffled_population) next_population = [] pop_size = len(shuffled_population) for i in range(0, pop_size, 2): p1 = shuffled_population[i] p2 = shuffled_population[(i + 1) % pop_size] if np.random.random() <= pc: p1, p2 = crossover_fn(p1, p2) next_population.append(p1) next_population.append(p2) return next_population[:pop_size] def build_random_mutation_fn(x_min: Chromosome, x_max: Chromosome) -> MutationFn: """Создаёт функцию случайной мутации.""" def mutation_fn(chrom: Chromosome) -> Chromosome: chrom_new = chrom.copy() k = np.random.randint(0, chrom_new.shape[0]) chrom_new[k] = np.random.uniform(x_min[k], x_max[k]) return chrom_new return mutation_fn def mutation(population: Population, pm: float, mutation_fn: MutationFn) -> Population: """Мутация происходит с вероятностью pm.""" next_population = [] for chrom in population: next_population.append( mutation_fn(chrom) if np.random.random() <= pm else chrom ) return next_population def clear_results_directory(results_dir: str) -> None: """Очищает папку с результатами перед началом эксперимента.""" if os.path.exists(results_dir): shutil.rmtree(results_dir) os.makedirs(results_dir, exist_ok=True) def eval_population(population: Population, fitness_func: FitnessFn) -> Fitnesses: return np.array([fitness_func(chrom) for chrom in population]) def plot_fitness_surface( fitness_func: FitnessFn, x_min: Chromosome, x_max: Chromosome, ax: Axes3D, num_points: int = 100, ) -> None: """Рисует поверхность функции фитнеса в 3D.""" assert ( x_min.shape == x_max.shape == (2,) ), "Рисовать графики можно только для функции от двух переменных" X = np.linspace(x_min[0], x_max[0], num_points) Y = np.linspace(x_min[1], x_max[1], num_points) X, Y = np.meshgrid(X, Y) vectorized_fitness = np.vectorize(lambda x, y: fitness_func(np.array([x, y]))) Z = vectorized_fitness(X, Y) return ax.plot_surface( X, Y, Z, cmap="viridis", edgecolor="none", alpha=0.7, shade=False ) def plot_fitness_contour( fitness_func: FitnessFn, x_min: Chromosome, x_max: Chromosome, ax: Axes, num_points: int = 100, ) -> None: """Рисует контурный график функции фитнеса в 2D.""" X = np.linspace(x_min[0], x_max[0], num_points) Y = np.linspace(x_min[1], x_max[1], num_points) X, Y = np.meshgrid(X, Y) vectorized_fitness = np.vectorize(lambda x, y: fitness_func(np.array([x, y]))) Z = vectorized_fitness(X, Y) # Рисуем контуры # X и Y поменяны местами для единообразия с 3D графиками, там ось Y изображена # горизонтально из-за особенностей функции в моём варианте contourf = ax.contourf(Y, X, Z, levels=20, cmap="viridis", alpha=0.7) ax.set_xlabel("Y") ax.set_ylabel("X") # Добавляем цветовую шкалу plt.colorbar(contourf, ax=ax, shrink=0.5) # По умолчанию matplotlib пытается растянуть график по оси Y, тут мы это отключаем ax.set_aspect("equal") def save_generation( generation: Generation, history: list[Generation], config: GARunConfig ) -> None: """Сохраняем графики поколения. Функция не самая универсальная, тут есть хардкод, однако для большинства вариантов должна работать и так. """ assert ( config.x_min.shape == config.x_max.shape == (2,) ), "Рисовать графики можно только для функции от двух переменных" os.makedirs(config.results_dir, exist_ok=True) fig = plt.figure(figsize=(21, 7)) fig.suptitle( f"Поколение #{generation.number}. " f"Лучшая особь: {generation.best_fitness:.4f}. " f"Среднее значение: {np.mean(generation.fitnesses):.4f}", fontsize=14, y=0.85, ) # Контурный график (как вид сверху) ax1 = fig.add_subplot(1, 3, 1) plot_fitness_contour(config.fitness_func, config.x_min, config.x_max, ax1) # Популяция на контурном графике arr = np.array(generation.population) # Координаты специально поменяны местами (см. plot_fitness_contour) ax1.scatter( arr[:, 1], arr[:, 0], c="red", marker="o", alpha=0.9, s=20, ) # Подпись под первым графиком ax1.text( 0.5, -0.3, "(a)", transform=ax1.transAxes, ha="center", fontsize=16, ) # 3D графики с разных ракурсов views_3d = [ # (elev, azim) (50, 0), (50, 15), ] for i, (elev, azim) in enumerate(views_3d): ax = fig.add_subplot(1, 3, i + 2, projection="3d", computed_zorder=False) plot_fitness_surface(config.fitness_func, config.x_min, config.x_max, ax) ax.set_xlabel("X") ax.set_ylabel("Y") ax.set_zlabel("f(X, Y)") ax.scatter( arr[:, 0], arr[:, 1], generation.fitnesses + 1, # type: ignore c="red", s=10, marker="o", alpha=0.9, ) # Устанавливаем угол обзора ax.view_init(elev=elev, azim=azim) # Подпись под 3D графиками label = chr(ord("b") + i) # 'b' для i=0, 'c' для i=1 ax.text2D( 0.5, -0.15, f"({label})", transform=ax.transAxes, ha="center", fontsize=16, ) filename = f"generation_{generation.number:03d}.png" path_png = os.path.join(config.results_dir, filename) fig.savefig(path_png, dpi=150, bbox_inches="tight") # Можно раскомментировать, чтобы подобрать более удачные ракурсы # в интерактивном режиме # fig.show() # plt.pause(1000) plt.close(fig) def genetic_algorithm(config: GARunConfig) -> GARunResult: if config.seed is not None: random.seed(config.seed) np.random.seed(config.seed) if config.save_generations: clear_results_directory(config.results_dir) population = initialize_population(config.pop_size, config.x_min, config.x_max) start = time.perf_counter() history: list[Generation] = [] best: Generation | None = None generation_number = 1 best_repetitions = 0 while True: # Вычисляем фитнес для всех особей в популяции fitnesses = eval_population(population, config.fitness_func) # Находим лучшую особь в поколении best_index = ( int(np.argmin(fitnesses)) if config.minimize else int(np.argmax(fitnesses)) ) # Добавляем эпоху в историю current = Generation( number=generation_number, best=population[best_index], best_fitness=fitnesses[best_index], population=deepcopy(population), fitnesses=deepcopy(fitnesses), ) history.append(current) if config.log_every_generation: print( f"Generation #{generation_number} best: {current.best_fitness}," f" avg: {np.mean(current.fitnesses)}" ) # Обновляем лучшую эпоху if ( best is None or (config.minimize and current.best_fitness < best.best_fitness) or (not config.minimize and current.best_fitness > best.best_fitness) ): best = current # Проверка критериев остановки stop_algorithm = False if generation_number >= config.max_generations: stop_algorithm = True if config.max_best_repetitions is not None and generation_number > 1: if history[-2].best_fitness == current.best_fitness: best_repetitions += 1 if best_repetitions == config.max_best_repetitions: stop_algorithm = True else: best_repetitions = 0 # if config.variance_threshold is not None: # fitness_variance = np.var(fitnesses) # if fitness_variance < config.variance_threshold: # stop_algorithm = True if config.best_value_threshold is not None: if ( config.minimize and current.best_fitness < config.best_value_threshold ) or ( not config.minimize and current.best_fitness > config.best_value_threshold ): stop_algorithm = True if config.fitness_avg_threshold is not None: mean_fitness = np.mean(fitnesses) if (config.minimize and mean_fitness < config.fitness_avg_threshold) or ( not config.minimize and mean_fitness > config.fitness_avg_threshold ): stop_algorithm = True # Сохраняем указанные поколения и последнее поколение if config.save_generations and ( stop_algorithm or generation_number in config.save_generations ): # save_generation(current, history, config) save_generation(current, history, config) if stop_algorithm: break # селекция (для минимума инвертируем знак) parents = reproduction( population, fitnesses if not config.minimize else -fitnesses ) # кроссинговер попарно next_population = crossover(parents, config.pc, arithmetical_crossover_fn) # мутация next_population = mutation( next_population, config.pm, build_random_mutation_fn(config.x_min, config.x_max), ) population = next_population[: config.pop_size] generation_number += 1 end = time.perf_counter() assert best is not None, "Best was never set" return GARunResult( len(history), best, history, (end - start) * 1000.0, )