Files
supercomputers/src/gpu_plugin.cu

438 lines
17 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <cuda_runtime.h>
#include <cub/cub.cuh>
#include <cstdint>
#include <cfloat>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <string>
#include <sstream>
#include <iomanip>
// ============================================================================
// Структуры данных
// ============================================================================
// Результат агрегации одного периода
struct GpuPeriodStats {
int64_t period;
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
int64_t count;
};
// ============================================================================
// Вспомогательные функции
// ============================================================================
static double get_time_ms() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
#define CUDA_CHECK(call) do { \
cudaError_t err = call; \
if (err != cudaSuccess) { \
printf("CUDA error at %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
return -1; \
} \
} while(0)
// ============================================================================
// Kernel: вычисление period_id для каждого тика
// ============================================================================
__global__ void compute_period_ids_kernel(
const double* __restrict__ timestamps,
int64_t* __restrict__ period_ids,
int n,
int64_t interval)
{
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
period_ids[idx] = static_cast<int64_t>(timestamps[idx]) / interval;
}
}
// ============================================================================
// Kernel: агрегация одного периода (один блок на период)
// ============================================================================
__global__ void aggregate_periods_kernel(
const double* __restrict__ open,
const double* __restrict__ high,
const double* __restrict__ low,
const double* __restrict__ close,
const int64_t* __restrict__ unique_periods,
const int* __restrict__ offsets,
const int* __restrict__ counts,
int num_periods,
GpuPeriodStats* __restrict__ out_stats)
{
int period_idx = blockIdx.x;
if (period_idx >= num_periods) return;
int offset = offsets[period_idx];
int count = counts[period_idx];
// Используем shared memory для редукции внутри блока
__shared__ double s_avg_sum;
__shared__ double s_open_min;
__shared__ double s_open_max;
__shared__ double s_close_min;
__shared__ double s_close_max;
// Инициализация shared memory первым потоком
if (threadIdx.x == 0) {
s_avg_sum = 0.0;
s_open_min = DBL_MAX;
s_open_max = -DBL_MAX;
s_close_min = DBL_MAX;
s_close_max = -DBL_MAX;
}
__syncthreads();
// Локальные аккумуляторы для каждого потока
double local_avg_sum = 0.0;
double local_open_min = DBL_MAX;
double local_open_max = -DBL_MAX;
double local_close_min = DBL_MAX;
double local_close_max = -DBL_MAX;
// Каждый поток обрабатывает свою часть тиков
for (int i = threadIdx.x; i < count; i += blockDim.x) {
int tick_idx = offset + i;
double avg = (low[tick_idx] + high[tick_idx]) / 2.0;
local_avg_sum += avg;
local_open_min = min(local_open_min, open[tick_idx]);
local_open_max = max(local_open_max, open[tick_idx]);
local_close_min = min(local_close_min, close[tick_idx]);
local_close_max = max(local_close_max, close[tick_idx]);
}
// Редукция с использованием атомарных операций
atomicAdd(&s_avg_sum, local_avg_sum);
atomicMin(reinterpret_cast<unsigned long long*>(&s_open_min),
__double_as_longlong(local_open_min));
atomicMax(reinterpret_cast<unsigned long long*>(&s_open_max),
__double_as_longlong(local_open_max));
atomicMin(reinterpret_cast<unsigned long long*>(&s_close_min),
__double_as_longlong(local_close_min));
atomicMax(reinterpret_cast<unsigned long long*>(&s_close_max),
__double_as_longlong(local_close_max));
__syncthreads();
// Первый поток записывает результат
if (threadIdx.x == 0) {
GpuPeriodStats stats;
stats.period = unique_periods[period_idx];
stats.avg = s_avg_sum / static_cast<double>(count);
stats.open_min = s_open_min;
stats.open_max = s_open_max;
stats.close_min = s_close_min;
stats.close_max = s_close_max;
stats.count = count;
out_stats[period_idx] = stats;
}
}
// ============================================================================
// Простой kernel для агрегации (один поток на период)
// Используется когда периодов много и тиков в каждом мало
// ============================================================================
__global__ void aggregate_periods_simple_kernel(
const double* __restrict__ open,
const double* __restrict__ high,
const double* __restrict__ low,
const double* __restrict__ close,
const int64_t* __restrict__ unique_periods,
const int* __restrict__ offsets,
const int* __restrict__ counts,
int num_periods,
GpuPeriodStats* __restrict__ out_stats)
{
int period_idx = blockIdx.x * blockDim.x + threadIdx.x;
if (period_idx >= num_periods) return;
int offset = offsets[period_idx];
int count = counts[period_idx];
double avg_sum = 0.0;
double open_min = DBL_MAX;
double open_max = -DBL_MAX;
double close_min = DBL_MAX;
double close_max = -DBL_MAX;
for (int i = 0; i < count; i++) {
int tick_idx = offset + i;
double avg = (low[tick_idx] + high[tick_idx]) / 2.0;
avg_sum += avg;
open_min = min(open_min, open[tick_idx]);
open_max = max(open_max, open[tick_idx]);
close_min = min(close_min, close[tick_idx]);
close_max = max(close_max, close[tick_idx]);
}
GpuPeriodStats stats;
stats.period = unique_periods[period_idx];
stats.avg = avg_sum / static_cast<double>(count);
stats.open_min = open_min;
stats.open_max = open_max;
stats.close_min = close_min;
stats.close_max = close_max;
stats.count = count;
out_stats[period_idx] = stats;
}
// ============================================================================
// Проверка доступности GPU
// ============================================================================
extern "C" int gpu_is_available() {
int n = 0;
cudaError_t err = cudaGetDeviceCount(&n);
if (err != cudaSuccess) return 0;
return (n > 0) ? 1 : 0;
}
// ============================================================================
// Главная функция агрегации на GPU
// ============================================================================
extern "C" int gpu_aggregate_periods(
const double* h_timestamps,
const double* h_open,
const double* h_high,
const double* h_low,
const double* h_close,
int num_ticks,
int64_t interval,
GpuPeriodStats** h_out_stats,
int* out_num_periods)
{
if (num_ticks == 0) {
*h_out_stats = nullptr;
*out_num_periods = 0;
return 0;
}
std::ostringstream output;
double total_start = get_time_ms();
// ========================================================================
// Шаг 1: Выделение памяти и копирование данных на GPU
// ========================================================================
double step1_start = get_time_ms();
double* d_timestamps = nullptr;
double* d_open = nullptr;
double* d_high = nullptr;
double* d_low = nullptr;
double* d_close = nullptr;
int64_t* d_period_ids = nullptr;
size_t ticks_bytes = num_ticks * sizeof(double);
CUDA_CHECK(cudaMalloc(&d_timestamps, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_open, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_high, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_low, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_close, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_period_ids, num_ticks * sizeof(int64_t)));
CUDA_CHECK(cudaMemcpy(d_timestamps, h_timestamps, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_open, h_open, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_high, h_high, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_low, h_low, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_close, h_close, ticks_bytes, cudaMemcpyHostToDevice));
double step1_ms = get_time_ms() - step1_start;
// ========================================================================
// Шаг 2: Вычисление period_id для каждого тика
// ========================================================================
double step2_start = get_time_ms();
const int BLOCK_SIZE = 256;
int num_blocks = (num_ticks + BLOCK_SIZE - 1) / BLOCK_SIZE;
compute_period_ids_kernel<<<num_blocks, BLOCK_SIZE>>>(
d_timestamps, d_period_ids, num_ticks, interval);
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaDeviceSynchronize());
double step2_ms = get_time_ms() - step2_start;
// ========================================================================
// Шаг 3: RLE (Run-Length Encode) для нахождения уникальных периодов
// ========================================================================
double step3_start = get_time_ms();
int64_t* d_unique_periods = nullptr;
int* d_counts = nullptr;
int* d_num_runs = nullptr;
CUDA_CHECK(cudaMalloc(&d_unique_periods, num_ticks * sizeof(int64_t)));
CUDA_CHECK(cudaMalloc(&d_counts, num_ticks * sizeof(int)));
CUDA_CHECK(cudaMalloc(&d_num_runs, sizeof(int)));
// Определяем размер временного буфера для CUB
void* d_temp_storage = nullptr;
size_t temp_storage_bytes = 0;
cub::DeviceRunLengthEncode::Encode(
d_temp_storage, temp_storage_bytes,
d_period_ids, d_unique_periods, d_counts, d_num_runs,
num_ticks);
CUDA_CHECK(cudaMalloc(&d_temp_storage, temp_storage_bytes));
cub::DeviceRunLengthEncode::Encode(
d_temp_storage, temp_storage_bytes,
d_period_ids, d_unique_periods, d_counts, d_num_runs,
num_ticks);
CUDA_CHECK(cudaGetLastError());
// Копируем количество уникальных периодов
int num_periods = 0;
CUDA_CHECK(cudaMemcpy(&num_periods, d_num_runs, sizeof(int), cudaMemcpyDeviceToHost));
cudaFree(d_temp_storage);
d_temp_storage = nullptr;
double step3_ms = get_time_ms() - step3_start;
// ========================================================================
// Шаг 4: Exclusive Scan для вычисления offsets
// ========================================================================
double step4_start = get_time_ms();
int* d_offsets = nullptr;
CUDA_CHECK(cudaMalloc(&d_offsets, num_periods * sizeof(int)));
temp_storage_bytes = 0;
cub::DeviceScan::ExclusiveSum(
d_temp_storage, temp_storage_bytes,
d_counts, d_offsets, num_periods);
CUDA_CHECK(cudaMalloc(&d_temp_storage, temp_storage_bytes));
cub::DeviceScan::ExclusiveSum(
d_temp_storage, temp_storage_bytes,
d_counts, d_offsets, num_periods);
CUDA_CHECK(cudaGetLastError());
cudaFree(d_temp_storage);
double step4_ms = get_time_ms() - step4_start;
// ========================================================================
// Шаг 5: Агрегация периодов
// ========================================================================
double step5_start = get_time_ms();
GpuPeriodStats* d_out_stats = nullptr;
CUDA_CHECK(cudaMalloc(&d_out_stats, num_periods * sizeof(GpuPeriodStats)));
// Выбор ядра через переменную окружения USE_BLOCK_KERNEL
const char* env_block_kernel = std::getenv("USE_BLOCK_KERNEL");
if (env_block_kernel == nullptr) {
printf("Error: Environment variable USE_BLOCK_KERNEL is not set\n");
return -1;
}
bool use_block_kernel = std::atoi(env_block_kernel) != 0;
if (use_block_kernel) {
// Блочное ядро: один блок на период, потоки параллельно обрабатывают тики
// Лучше для больших интервалов с множеством тиков в каждом периоде
aggregate_periods_kernel<<<num_periods, BLOCK_SIZE>>>(
d_open, d_high, d_low, d_close,
d_unique_periods, d_offsets, d_counts,
num_periods, d_out_stats);
} else {
// Простое ядро: один поток на период
// Лучше для множества периодов с малым количеством тиков в каждом
int agg_blocks = (num_periods + BLOCK_SIZE - 1) / BLOCK_SIZE;
aggregate_periods_simple_kernel<<<agg_blocks, BLOCK_SIZE>>>(
d_open, d_high, d_low, d_close,
d_unique_periods, d_offsets, d_counts,
num_periods, d_out_stats);
}
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaDeviceSynchronize());
double step5_ms = get_time_ms() - step5_start;
// ========================================================================
// Шаг 6: Копирование результатов на CPU
// ========================================================================
double step6_start = get_time_ms();
GpuPeriodStats* h_stats = new GpuPeriodStats[num_periods];
CUDA_CHECK(cudaMemcpy(h_stats, d_out_stats, num_periods * sizeof(GpuPeriodStats),
cudaMemcpyDeviceToHost));
double step6_ms = get_time_ms() - step6_start;
// ========================================================================
// Шаг 7: Освобождение GPU памяти
// ========================================================================
double step7_start = get_time_ms();
cudaFree(d_timestamps);
cudaFree(d_open);
cudaFree(d_high);
cudaFree(d_low);
cudaFree(d_close);
cudaFree(d_period_ids);
cudaFree(d_unique_periods);
cudaFree(d_counts);
cudaFree(d_offsets);
cudaFree(d_num_runs);
cudaFree(d_out_stats);
double step7_ms = get_time_ms() - step7_start;
// ========================================================================
// Итого
// ========================================================================
double total_ms = get_time_ms() - total_start;
// Формируем весь вывод одной строкой
output << " GPU aggregation (" << num_ticks << " ticks, interval=" << interval << " sec, kernel=" << (use_block_kernel ? "block" : "simple") << "):\n";
output << " 1. Malloc + H->D copy: " << std::fixed << std::setprecision(3) << std::setw(7) << step1_ms << " ms\n";
output << " 2. Compute period_ids: " << std::setw(7) << step2_ms << " ms\n";
output << " 3. RLE (CUB): " << std::setw(7) << step3_ms << " ms (" << num_periods << " periods)\n";
output << " 4. Exclusive scan: " << std::setw(7) << step4_ms << " ms\n";
output << " 5. Aggregation kernel: " << std::setw(7) << step5_ms << " ms (" << (use_block_kernel ? "block" : "simple") << ")\n";
output << " 6. D->H copy: " << std::setw(7) << step6_ms << " ms\n";
output << " 7. Free GPU memory: " << std::setw(7) << step7_ms << " ms\n";
output << " GPU TOTAL: " << std::setw(7) << total_ms << " ms\n";
// Выводим всё одним принтом
printf("%s", output.str().c_str());
fflush(stdout);
*h_out_stats = h_stats;
*out_num_periods = num_periods;
return 0;
}
// ============================================================================
// Освобождение памяти результатов
// ============================================================================
extern "C" void gpu_free_results(GpuPeriodStats* stats) {
delete[] stats;
}