diff --git a/run.slurm b/run.slurm index 725339d..32e9ba4 100644 --- a/run.slurm +++ b/run.slurm @@ -20,5 +20,8 @@ export AGGREGATION_INTERVAL=60 # Использовать ли CUDA для агрегации (0 = нет, 1 = да) export USE_CUDA=1 +# Использовать ли блочное ядро (быстрее для больших интервалов, 0 = нет, 1 = да) +export USE_BLOCK_KERNEL=0 + cd /mnt/shared/supercomputers/build mpirun -np $SLURM_NTASKS ./bitcoin_app diff --git a/src/gpu_plugin.cu b/src/gpu_plugin.cu index ab7dfb8..efeb3c9 100644 --- a/src/gpu_plugin.cu +++ b/src/gpu_plugin.cu @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -200,95 +201,6 @@ __global__ void aggregate_periods_simple_kernel( } - -static __device__ __forceinline__ double warp_reduce_sum(double v) { - for (int offset = 16; offset > 0; offset >>= 1) - v += __shfl_down_sync(0xffffffff, v, offset); - return v; -} -static __device__ __forceinline__ double warp_reduce_min(double v) { - for (int offset = 16; offset > 0; offset >>= 1) - v = fmin(v, __shfl_down_sync(0xffffffff, v, offset)); - return v; -} -static __device__ __forceinline__ double warp_reduce_max(double v) { - for (int offset = 16; offset > 0; offset >>= 1) - v = fmax(v, __shfl_down_sync(0xffffffff, v, offset)); - return v; -} -static __device__ __forceinline__ int warp_reduce_sum_int(int v) { - for (int offset = 16; offset > 0; offset >>= 1) - v += __shfl_down_sync(0xffffffff, v, offset); - return v; -} - -__global__ void aggregate_periods_warp_kernel( - const double* __restrict__ open, - const double* __restrict__ high, - const double* __restrict__ low, - const double* __restrict__ close, - const int64_t* __restrict__ unique_periods, - const int* __restrict__ offsets, - const int* __restrict__ counts, - int num_periods, - GpuPeriodStats* __restrict__ out_stats) -{ - int global_thread = blockIdx.x * blockDim.x + threadIdx.x; - int warp_id = global_thread >> 5; // /32 - int lane = threadIdx.x & 31; // %32 - int period_idx = warp_id; - - if (period_idx >= num_periods) return; - - int offset = offsets[period_idx]; - int count = counts[period_idx]; - - // Локальные аккумуляторы каждого lane - double sum_avg = 0.0; - double omin = DBL_MAX, omax = -DBL_MAX; - double cmin = DBL_MAX, cmax = -DBL_MAX; - int local_n = 0; - - // Каждый lane берёт i = lane, lane+32, lane+64... - for (int i = lane; i < count; i += 32) { - int tick = offset + i; - double avg = 0.5 * (low[tick] + high[tick]); - sum_avg += avg; - - double o = open[tick]; - double c = close[tick]; - - omin = fmin(omin, o); - omax = fmax(omax, o); - cmin = fmin(cmin, c); - cmax = fmax(cmax, c); - - local_n += 1; - } - - // Warp-редукция - sum_avg = warp_reduce_sum(sum_avg); - omin = warp_reduce_min(omin); - omax = warp_reduce_max(omax); - cmin = warp_reduce_min(cmin); - cmax = warp_reduce_max(cmax); - int n = warp_reduce_sum_int(local_n); // должно дать count, но так надёжнее - - // lane 0 пишет результат - if (lane == 0) { - GpuPeriodStats s; - s.period = unique_periods[period_idx]; - s.avg = (n > 0) ? (sum_avg / (double)n) : 0.0; - s.open_min = omin; - s.open_max = omax; - s.close_min = cmin; - s.close_max = cmax; - s.count = n; - out_stats[period_idx] = s; - } -} - - // ============================================================================ // Проверка доступности GPU // ============================================================================ @@ -442,26 +354,30 @@ extern "C" int gpu_aggregate_periods( GpuPeriodStats* d_out_stats = nullptr; CUDA_CHECK(cudaMalloc(&d_out_stats, num_periods * sizeof(GpuPeriodStats))); - // int agg_blocks = (num_periods + BLOCK_SIZE - 1) / BLOCK_SIZE; - // aggregate_periods_simple_kernel<<>>( - // d_open, d_high, d_low, d_close, - // d_unique_periods, d_offsets, d_counts, - // num_periods, d_out_stats); - - - // aggregate_periods_kernel<<>>( - // d_open, d_high, d_low, d_close, - // d_unique_periods, d_offsets, d_counts, - // num_periods, d_out_stats); - - int warps_per_block = BLOCK_SIZE / 32; // 8 - int num_blocks1 = (num_periods + warps_per_block - 1) / warps_per_block; - - aggregate_periods_warp_kernel<<>>( - d_open, d_high, d_low, d_close, - d_unique_periods, d_offsets, d_counts, - num_periods, d_out_stats); - + // Выбор ядра через переменную окружения USE_BLOCK_KERNEL + const char* env_block_kernel = std::getenv("USE_BLOCK_KERNEL"); + if (env_block_kernel == nullptr) { + printf("Error: Environment variable USE_BLOCK_KERNEL is not set\n"); + return -1; + } + bool use_block_kernel = std::atoi(env_block_kernel) != 0; + + if (use_block_kernel) { + // Блочное ядро: один блок на период, потоки параллельно обрабатывают тики + // Лучше для больших интервалов с множеством тиков в каждом периоде + aggregate_periods_kernel<<>>( + d_open, d_high, d_low, d_close, + d_unique_periods, d_offsets, d_counts, + num_periods, d_out_stats); + } else { + // Простое ядро: один поток на период + // Лучше для множества периодов с малым количеством тиков в каждом + int agg_blocks = (num_periods + BLOCK_SIZE - 1) / BLOCK_SIZE; + aggregate_periods_simple_kernel<<>>( + d_open, d_high, d_low, d_close, + d_unique_periods, d_offsets, d_counts, + num_periods, d_out_stats); + } CUDA_CHECK(cudaGetLastError()); @@ -505,12 +421,12 @@ extern "C" int gpu_aggregate_periods( double total_ms = get_time_ms() - total_start; // Формируем весь вывод одной строкой - output << " GPU aggregation (" << num_ticks << " ticks, interval=" << interval << " sec):\n"; + output << " GPU aggregation (" << num_ticks << " ticks, interval=" << interval << " sec, kernel=" << (use_block_kernel ? "block" : "simple") << "):\n"; output << " 1. Malloc + H->D copy: " << std::fixed << std::setprecision(3) << std::setw(7) << step1_ms << " ms\n"; output << " 2. Compute period_ids: " << std::setw(7) << step2_ms << " ms\n"; output << " 3. RLE (CUB): " << std::setw(7) << step3_ms << " ms (" << num_periods << " periods)\n"; output << " 4. Exclusive scan: " << std::setw(7) << step4_ms << " ms\n"; - output << " 5. Aggregation kernel: " << std::setw(7) << step5_ms << " ms\n"; + output << " 5. Aggregation kernel: " << std::setw(7) << step5_ms << " ms (" << (use_block_kernel ? "block" : "simple") << ")\n"; output << " 6. D->H copy: " << std::setw(7) << step6_ms << " ms\n"; output << " 7. Free GPU memory: " << std::setw(7) << step7_ms << " ms\n"; output << " GPU TOTAL: " << std::setw(7) << total_ms << " ms\n";