diff --git a/Makefile b/Makefile index 6fc2c2c..d6a9339 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ CXX = mpic++ -CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type +CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type -fopenmp NVCC = nvcc NVCCFLAGS = -O3 -std=c++17 -arch=sm_86 -Xcompiler -fPIC diff --git a/run.slurm b/run.slurm index 6340950..9087ae7 100644 --- a/run.slurm +++ b/run.slurm @@ -2,9 +2,11 @@ #SBATCH --job-name=btc #SBATCH --nodes=4 #SBATCH --ntasks=4 +#SBATCH --cpus-per-task=2 #SBATCH --output=out.txt -# mpirun -np $SLURM_NTASKS ./build/bitcoin_app +# Количество CPU потоков на узел (должно соответствовать cpus-per-task) +export NUM_CPU_THREADS=2 cd /mnt/shared/supercomputers/build mpirun -np $SLURM_NTASKS ./bitcoin_app diff --git a/src/main.cpp b/src/main.cpp index cdd4ca0..d05799d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,7 +1,10 @@ #include +#include #include #include #include +#include +#include #include "csv_loader.hpp" #include "utils.hpp" @@ -27,6 +30,26 @@ std::vector select_records_for_rank( return out; } +// Разделить записи на N частей (по дням) +std::vector> split_records(const std::vector& records, int n_parts) { + // Группируем по дням + std::map> by_day; + for (const auto& r : records) { + DayIndex day = static_cast(r.timestamp) / 86400; + by_day[day].push_back(r); + } + + // Распределяем дни по частям + std::vector> parts(n_parts); + int i = 0; + for (auto& [day, recs] : by_day) { + parts[i % n_parts].insert(parts[i % n_parts].end(), recs.begin(), recs.end()); + i++; + } + + return parts; +} + int main(int argc, char** argv) { MPI_Init(&argc, &argv); @@ -34,6 +57,15 @@ int main(int argc, char** argv) { MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); + // Читаем количество CPU потоков из переменной окружения (по умолчанию 2) + int num_cpu_threads = 2; + const char* env_threads = std::getenv("NUM_CPU_THREADS"); + if (env_threads) { + num_cpu_threads = std::atoi(env_threads); + if (num_cpu_threads < 1) num_cpu_threads = 1; + } + omp_set_num_threads(num_cpu_threads + 1); // +1 для GPU потока если есть + // ====== ЗАГРУЗКА GPU ФУНКЦИЙ ====== auto gpu_is_available = load_gpu_is_available(); auto gpu_aggregate = load_gpu_aggregate_days(); @@ -41,9 +73,9 @@ int main(int argc, char** argv) { bool have_gpu = false; if (gpu_is_available && gpu_is_available()) { have_gpu = true; - std::cout << "Rank " << rank << ": GPU available" << std::endl; + std::cout << "Rank " << rank << ": GPU available + " << num_cpu_threads << " CPU threads" << std::endl; } else { - std::cout << "Rank " << rank << ": GPU not available, using CPU" << std::endl; + std::cout << "Rank " << rank << ": " << num_cpu_threads << " CPU threads only" << std::endl; } std::vector local_records; @@ -87,26 +119,84 @@ int main(int argc, char** argv) { std::cout << "Rank " << rank << " received " << local_records.size() << " records" << std::endl; - // ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ (GPU или CPU) ====== + // ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ ====== std::vector local_stats; + double time_start = omp_get_wtime(); + + // Время работы: [0] = GPU (если есть), [1..n] = CPU потоки + std::vector worker_times(num_cpu_threads + 1, 0.0); if (have_gpu && gpu_aggregate) { - bool gpu_success = aggregate_days_gpu(local_records, local_stats, gpu_aggregate); - if (gpu_success) { - std::cout << "Rank " << rank << " aggregated " - << local_stats.size() << " days (GPU)" << std::endl; - } else { - // Fallback на CPU при ошибке GPU - std::cout << "Rank " << rank << ": GPU aggregation failed, falling back to CPU" << std::endl; - local_stats = aggregate_days(local_records); - std::cout << "Rank " << rank << " aggregated " - << local_stats.size() << " days (CPU)" << std::endl; + // GPU узел: делим на (1 + num_cpu_threads) частей + int n_workers = 1 + num_cpu_threads; + auto parts = split_records(local_records, n_workers); + + std::vector> results(n_workers); + std::vector success(n_workers, true); + + #pragma omp parallel + { + int tid = omp_get_thread_num(); + if (tid < n_workers) { + double t0 = omp_get_wtime(); + if (tid == 0) { + // GPU поток + success[0] = aggregate_days_gpu(parts[0], results[0], gpu_aggregate); + } else { + // CPU потоки + results[tid] = aggregate_days(parts[tid]); + } + worker_times[tid] = omp_get_wtime() - t0; + } } + + // Объединяем результаты + for (int i = 0; i < n_workers; i++) { + if (i == 0 && !success[0]) { + // GPU failed - обработаем на CPU + std::cout << "Rank " << rank << ": GPU failed, processing on CPU" << std::endl; + double t0 = omp_get_wtime(); + results[0] = aggregate_days(parts[0]); + worker_times[0] = omp_get_wtime() - t0; + } + local_stats.insert(local_stats.end(), results[i].begin(), results[i].end()); + } + } else { - local_stats = aggregate_days(local_records); - std::cout << "Rank " << rank << " aggregated " - << local_stats.size() << " days (CPU)" << std::endl; + // CPU-only узел + auto parts = split_records(local_records, num_cpu_threads); + std::vector> results(num_cpu_threads); + + #pragma omp parallel + { + int tid = omp_get_thread_num(); + if (tid < num_cpu_threads) { + double t0 = omp_get_wtime(); + results[tid] = aggregate_days(parts[tid]); + worker_times[tid + 1] = omp_get_wtime() - t0; // +1 т.к. [0] для GPU + } + } + + for (int i = 0; i < num_cpu_threads; i++) { + local_stats.insert(local_stats.end(), results[i].begin(), results[i].end()); + } } + + double time_total = omp_get_wtime() - time_start; + + // Вывод времени + std::cout << std::fixed << std::setprecision(3); + std::cout << "Rank " << rank << " aggregated " << local_stats.size() << " days in " + << time_total << "s ("; + if (have_gpu) { + std::cout << "GPU: " << worker_times[0] << "s, "; + } + for (int i = 0; i < num_cpu_threads; i++) { + int idx = have_gpu ? (i + 1) : (i + 1); + std::cout << "CPU" << i << ": " << worker_times[idx] << "s"; + if (i < num_cpu_threads - 1) std::cout << ", "; + } + std::cout << ")" << std::endl; // ====== СБОР АГРЕГИРОВАННЫХ ДАННЫХ НА RANK 0 ====== std::vector all_stats;