#include #include #include #include #include #include #include #include "csv_loader.hpp" #include "utils.hpp" #include "record.hpp" #include "day_stats.hpp" #include "aggregation.hpp" #include "intervals.hpp" #include "gpu_loader.hpp" // Функция: отобрать записи для конкретного ранга std::vector select_records_for_rank( const std::map>& days, const std::vector& day_list) { std::vector out; for (auto d : day_list) { auto it = days.find(d); if (it != days.end()) { const auto& vec = it->second; out.insert(out.end(), vec.begin(), vec.end()); } } return out; } // Разделить записи на N частей (по дням) std::vector> split_records(const std::vector& records, int n_parts) { // Группируем по дням std::map> by_day; for (const auto& r : records) { DayIndex day = static_cast(r.timestamp) / 86400; by_day[day].push_back(r); } // Распределяем дни по частям std::vector> parts(n_parts); int i = 0; for (auto& [day, recs] : by_day) { parts[i % n_parts].insert(parts[i % n_parts].end(), recs.begin(), recs.end()); i++; } return parts; } int main(int argc, char** argv) { MPI_Init(&argc, &argv); int rank, size; MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); // Читаем количество CPU потоков из переменной окружения int num_cpu_threads = 2; const char* env_threads = std::getenv("NUM_CPU_THREADS"); if (env_threads) { num_cpu_threads = std::atoi(env_threads); if (num_cpu_threads < 1) num_cpu_threads = 1; } omp_set_num_threads(num_cpu_threads + 1); // +1 для GPU потока если есть // ====== ЗАГРУЗКА GPU ФУНКЦИЙ ====== auto gpu_is_available = load_gpu_is_available(); auto gpu_aggregate = load_gpu_aggregate_days(); bool have_gpu = false; if (gpu_is_available && gpu_is_available()) { have_gpu = true; std::cout << "Rank " << rank << ": GPU available + " << num_cpu_threads << " CPU threads" << std::endl; } else { std::cout << "Rank " << rank << ": " << num_cpu_threads << " CPU threads only" << std::endl; } std::vector local_records; // ====== ТАЙМЕРЫ ====== double time_load_data = 0.0; double time_distribute = 0.0; if (rank == 0) { std::cout << "Rank 0 loading CSV..." << std::endl; // Таймер загрузки данных double t_load_start = MPI_Wtime(); // Запускаем из build auto records = load_csv("../data/data.csv"); auto days = group_by_day(records); auto parts = split_days(days, size); time_load_data = MPI_Wtime() - t_load_start; std::cout << "Rank 0: Data loading time: " << std::fixed << std::setprecision(3) << time_load_data << "s" << std::endl; // Таймер рассылки данных double t_distribute_start = MPI_Wtime(); // Рассылаем данные for (int r = 0; r < size; r++) { auto vec = select_records_for_rank(days, parts[r]); if (r == 0) { // себе не отправляем — сразу сохраняем local_records = vec; continue; } int count = static_cast(vec.size()); MPI_Send(&count, 1, MPI_INT, r, 0, MPI_COMM_WORLD); MPI_Send(vec.data(), count * sizeof(Record), MPI_BYTE, r, 1, MPI_COMM_WORLD); } time_distribute = MPI_Wtime() - t_distribute_start; } else { // Таймер получения данных double t_receive_start = MPI_Wtime(); // Принимает данные int count = 0; MPI_Recv(&count, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); local_records.resize(count); MPI_Recv(local_records.data(), count * sizeof(Record), MPI_BYTE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); time_distribute = MPI_Wtime() - t_receive_start; } MPI_Barrier(MPI_COMM_WORLD); // Вывод времени рассылки/получения данных std::cout << "Rank " << rank << ": Data distribution time: " << std::fixed << std::setprecision(3) << time_distribute << "s" << std::endl; std::cout << "Rank " << rank << " received " << local_records.size() << " records" << std::endl; // ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ ====== std::vector local_stats; double time_start = omp_get_wtime(); // Время работы: [0] = GPU (если есть), [1..n] = CPU потоки std::vector worker_times(num_cpu_threads + 1, 0.0); if (have_gpu && gpu_aggregate) { // GPU узел: делим на (1 + num_cpu_threads) частей int n_workers = 1 + num_cpu_threads; auto parts = split_records(local_records, n_workers); std::vector> results(n_workers); std::vector success(n_workers, true); #pragma omp parallel { int tid = omp_get_thread_num(); if (tid < n_workers) { double t0 = omp_get_wtime(); if (tid == 0) { // GPU поток success[0] = aggregate_days_gpu(parts[0], results[0], gpu_aggregate); } else { // CPU потоки results[tid] = aggregate_days(parts[tid]); } worker_times[tid] = omp_get_wtime() - t0; } } // Объединяем результаты for (int i = 0; i < n_workers; i++) { if (i == 0 && !success[0]) { // GPU failed - обработаем на CPU std::cout << "Rank " << rank << ": GPU failed, processing on CPU" << std::endl; double t0 = omp_get_wtime(); results[0] = aggregate_days(parts[0]); worker_times[0] = omp_get_wtime() - t0; } local_stats.insert(local_stats.end(), results[i].begin(), results[i].end()); } } else { // CPU-only узел auto parts = split_records(local_records, num_cpu_threads); std::vector> results(num_cpu_threads); #pragma omp parallel { int tid = omp_get_thread_num(); if (tid < num_cpu_threads) { double t0 = omp_get_wtime(); results[tid] = aggregate_days(parts[tid]); worker_times[tid + 1] = omp_get_wtime() - t0; // +1 т.к. [0] для GPU } } for (int i = 0; i < num_cpu_threads; i++) { local_stats.insert(local_stats.end(), results[i].begin(), results[i].end()); } } double time_total = omp_get_wtime() - time_start; // Вывод времени std::cout << std::fixed << std::setprecision(3); std::cout << "Rank " << rank << " aggregated " << local_stats.size() << " days in " << time_total << "s ("; if (have_gpu) { std::cout << "GPU: " << worker_times[0] << "s, "; } for (int i = 0; i < num_cpu_threads; i++) { int idx = have_gpu ? (i + 1) : (i + 1); std::cout << "CPU" << i << ": " << worker_times[idx] << "s"; if (i < num_cpu_threads - 1) std::cout << ", "; } std::cout << ")" << std::endl; // ====== СБОР АГРЕГИРОВАННЫХ ДАННЫХ НА RANK 0 ====== std::vector all_stats; if (rank == 0) { // Добавляем свои данные all_stats.insert(all_stats.end(), local_stats.begin(), local_stats.end()); // Получаем данные от других узлов for (int r = 1; r < size; r++) { int count = 0; MPI_Recv(&count, 1, MPI_INT, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); std::vector remote_stats(count); MPI_Recv(remote_stats.data(), count * sizeof(DayStats), MPI_BYTE, r, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); all_stats.insert(all_stats.end(), remote_stats.begin(), remote_stats.end()); } } else { // Отправляем свои агрегированные данные на rank 0 int count = static_cast(local_stats.size()); MPI_Send(&count, 1, MPI_INT, 0, 2, MPI_COMM_WORLD); MPI_Send(local_stats.data(), count * sizeof(DayStats), MPI_BYTE, 0, 3, MPI_COMM_WORLD); } // ====== ВЫЧИСЛЕНИЕ ИНТЕРВАЛОВ НА RANK 0 ====== if (rank == 0) { std::cout << "Rank 0: merging " << all_stats.size() << " day stats..." << std::endl; // Объединяем и сортируем auto merged_stats = merge_day_stats(all_stats); std::cout << "Rank 0: total " << merged_stats.size() << " unique days" << std::endl; // Вычисляем интервалы auto intervals = find_intervals(merged_stats, 0.10); std::cout << "Found " << intervals.size() << " intervals with >=10% change" << std::endl; // Записываем результат write_intervals("../result.csv", intervals); std::cout << "Results written to result.csv" << std::endl; // Выводим первые несколько интервалов std::cout << "\nFirst 5 intervals:\n"; std::cout << "start_date,end_date,min_open,max_close,change\n"; for (size_t i = 0; i < std::min(intervals.size(), size_t(5)); i++) { const auto& iv = intervals[i]; std::cout << day_index_to_date(iv.start_day) << "," << day_index_to_date(iv.end_day) << "," << iv.min_open << "," << iv.max_close << "," << iv.change << "\n"; } } MPI_Finalize(); return 0; }