From 73c9e580e4e7904594ad1a011e274f9e84a34568 Mon Sep 17 00:00:00 2001 From: Arity-T Date: Tue, 2 Dec 2025 12:39:09 +0000 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=B0=20GPU=20=D0=B2=D1=8B=D1=87=D0=B8?= =?UTF-8?q?=D1=81=D0=BB=D0=B5=D0=BD=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 2 +- src/gpu_loader.cpp | 110 ++++++++++++++++++++++++++++++- src/gpu_loader.hpp | 45 +++++++++++++ src/gpu_plugin.cu | 159 +++++++++++++++++++++++++++++++++++++++++++++ src/main.cpp | 45 +++++++++---- 5 files changed, 344 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index 37c42a3..6fc2c2c 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CXX = mpic++ CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type NVCC = nvcc -NVCCFLAGS = -O2 -Xcompiler -fPIC +NVCCFLAGS = -O3 -std=c++17 -arch=sm_86 -Xcompiler -fPIC SRC_DIR = src BUILD_DIR = build diff --git a/src/gpu_loader.cpp b/src/gpu_loader.cpp index 4efb8eb..49c382d 100644 --- a/src/gpu_loader.cpp +++ b/src/gpu_loader.cpp @@ -1,12 +1,116 @@ #include "gpu_loader.hpp" #include +#include +#include + +static void* get_gpu_lib_handle() { + static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL); + return h; +} gpu_is_available_fn load_gpu_is_available() { - void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL); + void* h = get_gpu_lib_handle(); if (!h) return nullptr; auto fn = (gpu_is_available_fn)dlsym(h, "gpu_is_available"); - if (!fn) return nullptr; - return fn; } + +gpu_aggregate_days_fn load_gpu_aggregate_days() { + void* h = get_gpu_lib_handle(); + if (!h) return nullptr; + + auto fn = (gpu_aggregate_days_fn)dlsym(h, "gpu_aggregate_days"); + return fn; +} + +bool aggregate_days_gpu( + const std::vector& records, + std::vector& out_stats, + gpu_aggregate_days_fn gpu_fn) +{ + if (!gpu_fn || records.empty()) { + return false; + } + + // Группируем записи по дням и подготавливаем данные для GPU + std::map> day_record_indices; + + for (size_t i = 0; i < records.size(); i++) { + DayIndex day = static_cast(records[i].timestamp) / 86400; + day_record_indices[day].push_back(i); + } + + int num_days = static_cast(day_record_indices.size()); + + // Подготавливаем массивы для GPU + std::vector gpu_records; + std::vector day_offsets; + std::vector day_counts; + std::vector day_indices; + + gpu_records.reserve(records.size()); + day_offsets.reserve(num_days); + day_counts.reserve(num_days); + day_indices.reserve(num_days); + + int current_offset = 0; + + for (auto& [day, indices] : day_record_indices) { + day_indices.push_back(day); + day_offsets.push_back(current_offset); + day_counts.push_back(static_cast(indices.size())); + + // Добавляем записи этого дня + for (size_t idx : indices) { + const auto& r = records[idx]; + GpuRecord gr; + gr.timestamp = r.timestamp; + gr.open = r.open; + gr.high = r.high; + gr.low = r.low; + gr.close = r.close; + gr.volume = r.volume; + gpu_records.push_back(gr); + } + + current_offset += static_cast(indices.size()); + } + + // Выделяем память для результата + std::vector gpu_stats(num_days); + + // Вызываем GPU функцию + int result = gpu_fn( + gpu_records.data(), + static_cast(gpu_records.size()), + day_offsets.data(), + day_counts.data(), + day_indices.data(), + num_days, + gpu_stats.data() + ); + + if (result != 0) { + return false; + } + + // Конвертируем результат в DayStats + out_stats.clear(); + out_stats.reserve(num_days); + + for (const auto& gs : gpu_stats) { + DayStats ds; + ds.day = gs.day; + ds.low = gs.low; + ds.high = gs.high; + ds.open = gs.open; + ds.close = gs.close; + ds.avg = gs.avg; + ds.first_ts = gs.first_ts; + ds.last_ts = gs.last_ts; + out_stats.push_back(ds); + } + + return true; +} diff --git a/src/gpu_loader.hpp b/src/gpu_loader.hpp index 6aed63d..3fdd174 100644 --- a/src/gpu_loader.hpp +++ b/src/gpu_loader.hpp @@ -1,4 +1,49 @@ #pragma once +#include "day_stats.hpp" +#include "record.hpp" +#include + +// Типы функций из GPU плагина using gpu_is_available_fn = int (*)(); +// Структуры для GPU (должны совпадать с gpu_plugin.cu) +struct GpuRecord { + double timestamp; + double open; + double high; + double low; + double close; + double volume; +}; + +struct GpuDayStats { + long long day; + double low; + double high; + double open; + double close; + double avg; + double first_ts; + double last_ts; +}; + +using gpu_aggregate_days_fn = int (*)( + const GpuRecord* h_records, + int num_records, + const int* h_day_offsets, + const int* h_day_counts, + const long long* h_day_indices, + int num_days, + GpuDayStats* h_out_stats +); + +// Загрузка функций из плагина gpu_is_available_fn load_gpu_is_available(); +gpu_aggregate_days_fn load_gpu_aggregate_days(); + +// Обёртка для агрегации на GPU (возвращает true если успешно) +bool aggregate_days_gpu( + const std::vector& records, + std::vector& out_stats, + gpu_aggregate_days_fn gpu_fn +); diff --git a/src/gpu_plugin.cu b/src/gpu_plugin.cu index 2ad243d..9951a0a 100644 --- a/src/gpu_plugin.cu +++ b/src/gpu_plugin.cu @@ -1,4 +1,27 @@ #include +#include +#include + +// Структуры данных (должны совпадать с C++ кодом) +struct GpuRecord { + double timestamp; + double open; + double high; + double low; + double close; + double volume; +}; + +struct GpuDayStats { + long long day; + double low; + double high; + double open; + double close; + double avg; + double first_ts; + double last_ts; +}; extern "C" int gpu_is_available() { int n = 0; @@ -6,3 +29,139 @@ extern "C" int gpu_is_available() { if (err != cudaSuccess) return 0; return (n > 0) ? 1 : 0; } + +// Kernel для агрегации (один поток обрабатывает все данные) +__global__ void aggregate_kernel( + const GpuRecord* records, + int num_records, + const int* day_offsets, // начало каждого дня в массиве records + const int* day_counts, // количество записей в каждом дне + const long long* day_indices, // индексы дней + int num_days, + GpuDayStats* out_stats) +{ + // Один поток обрабатывает все дни последовательно + for (int d = 0; d < num_days; d++) { + int offset = day_offsets[d]; + int count = day_counts[d]; + + GpuDayStats stats; + stats.day = day_indices[d]; + stats.low = DBL_MAX; + stats.high = -DBL_MAX; + stats.first_ts = DBL_MAX; + stats.last_ts = -DBL_MAX; + stats.open = 0; + stats.close = 0; + + for (int i = 0; i < count; i++) { + const GpuRecord& r = records[offset + i]; + + // min/max + if (r.low < stats.low) stats.low = r.low; + if (r.high > stats.high) stats.high = r.high; + + // first/last по timestamp + if (r.timestamp < stats.first_ts) { + stats.first_ts = r.timestamp; + stats.open = r.open; + } + if (r.timestamp > stats.last_ts) { + stats.last_ts = r.timestamp; + stats.close = r.close; + } + } + + stats.avg = (stats.low + stats.high) / 2.0; + out_stats[d] = stats; + } +} + +// Функция агрегации, вызываемая из C++ +extern "C" int gpu_aggregate_days( + const GpuRecord* h_records, + int num_records, + const int* h_day_offsets, + const int* h_day_counts, + const long long* h_day_indices, + int num_days, + GpuDayStats* h_out_stats) +{ + // Выделяем память на GPU + GpuRecord* d_records = nullptr; + int* d_day_offsets = nullptr; + int* d_day_counts = nullptr; + long long* d_day_indices = nullptr; + GpuDayStats* d_out_stats = nullptr; + + cudaError_t err; + + err = cudaMalloc(&d_records, num_records * sizeof(GpuRecord)); + if (err != cudaSuccess) return -1; + + err = cudaMalloc(&d_day_offsets, num_days * sizeof(int)); + if (err != cudaSuccess) { cudaFree(d_records); return -2; } + + err = cudaMalloc(&d_day_counts, num_days * sizeof(int)); + if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); return -3; } + + err = cudaMalloc(&d_day_indices, num_days * sizeof(long long)); + if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); return -4; } + + err = cudaMalloc(&d_out_stats, num_days * sizeof(GpuDayStats)); + if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); cudaFree(d_day_indices); return -5; } + + // Копируем данные на GPU + err = cudaMemcpy(d_records, h_records, num_records * sizeof(GpuRecord), cudaMemcpyHostToDevice); + if (err != cudaSuccess) return -10; + + err = cudaMemcpy(d_day_offsets, h_day_offsets, num_days * sizeof(int), cudaMemcpyHostToDevice); + if (err != cudaSuccess) return -11; + + err = cudaMemcpy(d_day_counts, h_day_counts, num_days * sizeof(int), cudaMemcpyHostToDevice); + if (err != cudaSuccess) return -12; + + err = cudaMemcpy(d_day_indices, h_day_indices, num_days * sizeof(long long), cudaMemcpyHostToDevice); + if (err != cudaSuccess) return -13; + + // Запускаем kernel (1 блок, 1 поток) + aggregate_kernel<<<1, 1>>>( + d_records, num_records, + d_day_offsets, d_day_counts, d_day_indices, + num_days, d_out_stats + ); + + // Проверяем ошибку запуска kernel + err = cudaGetLastError(); + if (err != cudaSuccess) { + cudaFree(d_records); + cudaFree(d_day_offsets); + cudaFree(d_day_counts); + cudaFree(d_day_indices); + cudaFree(d_out_stats); + return -7; + } + + // Ждём завершения + err = cudaDeviceSynchronize(); + if (err != cudaSuccess) { + cudaFree(d_records); + cudaFree(d_day_offsets); + cudaFree(d_day_counts); + cudaFree(d_day_indices); + cudaFree(d_out_stats); + return -6; + } + + // Копируем результат обратно + cudaMemcpy(h_out_stats, d_out_stats, num_days * sizeof(GpuDayStats), cudaMemcpyDeviceToHost); + + // Освобождаем память + cudaFree(d_records); + cudaFree(d_day_offsets); + cudaFree(d_day_counts); + cudaFree(d_day_indices); + cudaFree(d_out_stats); + + return 0; +} diff --git a/src/main.cpp b/src/main.cpp index 66b9731..cdd4ca0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -34,6 +34,18 @@ int main(int argc, char** argv) { MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); + // ====== ЗАГРУЗКА GPU ФУНКЦИЙ ====== + auto gpu_is_available = load_gpu_is_available(); + auto gpu_aggregate = load_gpu_aggregate_days(); + + bool have_gpu = false; + if (gpu_is_available && gpu_is_available()) { + have_gpu = true; + std::cout << "Rank " << rank << ": GPU available" << std::endl; + } else { + std::cout << "Rank " << rank << ": GPU not available, using CPU" << std::endl; + } + std::vector local_records; if (rank == 0) { @@ -75,10 +87,26 @@ int main(int argc, char** argv) { std::cout << "Rank " << rank << " received " << local_records.size() << " records" << std::endl; - // ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ ====== - auto local_stats = aggregate_days(local_records); - std::cout << "Rank " << rank << " aggregated " - << local_stats.size() << " days" << std::endl; + // ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ (GPU или CPU) ====== + std::vector local_stats; + + if (have_gpu && gpu_aggregate) { + bool gpu_success = aggregate_days_gpu(local_records, local_stats, gpu_aggregate); + if (gpu_success) { + std::cout << "Rank " << rank << " aggregated " + << local_stats.size() << " days (GPU)" << std::endl; + } else { + // Fallback на CPU при ошибке GPU + std::cout << "Rank " << rank << ": GPU aggregation failed, falling back to CPU" << std::endl; + local_stats = aggregate_days(local_records); + std::cout << "Rank " << rank << " aggregated " + << local_stats.size() << " days (CPU)" << std::endl; + } + } else { + local_stats = aggregate_days(local_records); + std::cout << "Rank " << rank << " aggregated " + << local_stats.size() << " days (CPU)" << std::endl; + } // ====== СБОР АГРЕГИРОВАННЫХ ДАННЫХ НА RANK 0 ====== std::vector all_stats; @@ -134,15 +162,6 @@ int main(int argc, char** argv) { } } - // Проверка GPU (оставляем как есть) - auto gpu_is_available = load_gpu_is_available(); - int have_gpu = 0; - if (gpu_is_available) { - std::cout << "Rank " << rank << " dll loaded" << std::endl; - have_gpu = gpu_is_available(); - } - std::cout << "Rank " << rank << ": gpu_available=" << have_gpu << "\n"; - MPI_Finalize(); return 0; }