BioPulse AI Challenge: rPPG Signal Fusion

Submit solution

Points: 1000 (partial)
Time limit: 1200.0s
Memory limit: 2G

Author:
Problem type
Allowed languages
Python, Text

Описание задачи

BioPulse AI - технология бесконтактного мониторинга физиологических параметров на основе дистанционной фотоплетизмографии (rPPG). Алгоритм анализирует видеозапись лица с камеры устройства и извлекает пульсовую волну без физического контакта.

Проблема

Алгоритм обрабатывает видео скользящим окном в 320 кадров (~10.67 с при 30 fps). При обработке записей длительностью от 1 минуты и более на стыках окон возникают артефакты, которые снижают качество итогового сигнала и влияют на точность расчёта физиологических показателей (смотри рисунок).

image

Что дано

Для каждой записи участникам предоставляются два rPPG-ряда одинаковой длины с частотой дискретизации 30 Hz:

Ряд Описание
signal_a rPPG-сигнал, полученный стандартным проходом окна по 320 кадров. Покрывает [0 .. L+320) - последние 320 отсчётов являются паддингом
signal_b rPPG-сигнал, полученный со смещением окна на 160 кадров. Покрывает [-160 .. L+160) - первые 160 и последние 160 отсчётов являются паддингом

Оба сигнала получены одним и тем же алгоритмом BioPulse AI, но с разным расположением стыков:

  • У signal_a стыки на кадрах 320, 640, 960, ... (каждые ~10.67 с)
  • У signal_b стыки на кадрах 480, 800, 1120, ... (смещены на 160 кадров)

Типичная длительность записей: от \(53 с\) до \(3.2 мин\) (5-18 окон).

Что нужно сделать

Написать алгоритм, который принимает на вход два массива окон (signal_a, signal_b) shape [N, 320] и возвращает один восстановленный PPG-массив (signal_out) shape [N, 320], максимально приближенный к эталонному сигналу.

Ключевая идея: стыки у двух сигналов расположены в разных местах. Там, где signal_a содержит артефакт стыка, signal_b имеет чистый сигнал (и наоборот). Задача - грамотно объединить эту информацию.


Формат данных

Тренировочный датасет

Структура папки train/ ссылка (https://disk.yandex.ru/d/CfCa1RNAfQLdbQ ) на данные:

train/
├── data/          # входные сигналы
│   ├── {record_id}_pred.npy
│   └── ...
├── results/       # эталонные сигналы (ground truth)
│   ├── {record_id}_gt.npy
│   └── ...
└── readme.txt

Входные данные (data/{record_id}_pred.npy) - NumPy-массив формата float32 с shape [2, N, 320]:

Индекс Описание
[0] - signal_a N окон по 320 кадров, стандартный проход (окна: 0-319, 320-639, ...)
[1] - signal_b N окон по 320 кадров, со смещением на 160 кадров. Первое окно начинается с кадра -160

Эталон (results/{record_id}_gt.npy) - NumPy-массив формата float32 с shape [N, 320]:

  • N окон по 320 кадров - целевой PPG-сигнал, с которым сравнивается результат

Всего в тренировочном наборе 373 записи, от 5 до 18 окон каждая.

Пример загрузки данных
import numpy as np

pred = np.load('train/data/00d7cb23-..._pred.npy')  # shape: [2, N, 320]
signal_a = pred[0]  # shape: [N, 320]
signal_b = pred[1]  # shape: [N, 320]

gt = np.load('train/results/00d7cb23-..._gt.npy')    # shape: [N, 320]
Формат решения

Участники отправляют архив, содержащий файл solution.py с классом:

import numpy as np

class Solver:
    def solve(self, signal_a: np.ndarray, signal_b: np.ndarray) -> np.ndarray:
        """
        Объединяет два rPPG-сигнала в один восстановленный PPG-сигнал.

        Args:
            signal_a: np.ndarray shape [N, 320] - стандартный проход
            signal_b: np.ndarray shape [N, 320] - проход со смещением на 160 кадров

        Returns:
            np.ndarray shape [N, 320] - восстановленный PPG-сигнал
        """
        ...

Ограничения:

  • Допускаются библиотеки: scipy 1.13.0, tensorflow 2.21.0, torch 2.10.0+cpu, sklearn 1.4.2, numpy .26.4, pandas 2.3.3, torchaudio 2.10.0+cpu, pywt 1.8.0, onnx 1.20.1, heartpy 1.2.6, pickle: 4.0, joblib: 1.4.0
  • Разрешены внешние данные и предобученные модели (включать в архив)
  • Размер архива: не более 200 МБ
  • Потребление памяти: не более 2 ГБ
  • Время обработки одной записи: не более 10 секунд
  • Решение не должно обращаться к сети

Метрика

Итоговая метрика: BioPulse Score

Расчёт физиологических показателей выполняется на сервере с помощью библиотеки HeartPy. По восстановленному сигналу signal_out рассчитываются три показателя и сравниваются с эталоном:

Показатель Описание Единицы Типичный диапазон
HR Частота сердечных сокращений уд/мин 50–120
SDNN Стандартное отклонение RR-интервалов мс 20–100
RMSSD Среднеквадратичное отклонение последовательных RR-интервалов мс 15–80

Важно: при проверке последнее окно (320 отсчётов) отбрасывается и у предсказания, и у эталона, так как оно содержит паддинг. Метрики рассчитываются по первым N-1 окнам.

Шаг 1. Абсолютные ошибки

Для каждой записи вычисляется абсолютная ошибка по каждому показателю:

\[e_{HR} = |HR_{pred} - HR_{gt}|\] \[e_{SDNN} = |SDNN_{pred} - SDNN_{gt}|\] \[e_{RMSSD} = |RMSSD_{pred} - RMSSD_{gt}|\]

Шаг 2. MAE по всем записям

\[MAE_{HR} = \frac{1}{N}\sum_{i=1}^{N} e_{HR}^{(i)}, \quad MAE_{SDNN} = \frac{1}{N}\sum_{i=1}^{N} e_{SDNN}^{(i)}, \quad MAE_{RMSSD} = \frac{1}{N}\sum_{i=1}^{N} e_{RMSSD}^{(i)}\]

Шаг 3. Нормализованная ошибка (CP-MAE)

Приводим показатели с разными масштабами к единой шкале:

\[CP\text{-}MAE = \frac{MAE_{HR}}{HR_{norm}} + \frac{MAE_{SDNN}}{SDNN_{norm}} + \frac{MAE_{RMSSD}}{RMSSD_{norm}}\]

Где нормировочные коэффициенты — средние значения по эталонным данным тестового набора:

Коэффициент Определение
\(HR_{norm}\) Среднее HR по всем тестовым записям
\(SDNN_{norm}\) Среднее SDNN по всем тестовым записям
\(RMSSD_{norm}\) Среднее RMSSD по всем тестовым записям
Шаг 4. Итоговый Score

\[\boxed{BioPulse\ Score = \frac{1000}{1 + CP\text{-}MAE}}\]

Качество решения CP-MAE BioPulse Score
Идеальное 0.0 1000
Отличное 0.1 909
Хорошее 0.3 769
Среднее 1.0 500
Слабое 5.0 167

Чем выше BioPulse Score, тем лучше. Диапазон: (0, 1000]. Метрика одинаково учитывает точность по всем трём показателям вне зависимости от их абсолютных значений.

Пример расчёта показателей с HeartPy
import heartpy as hp
import numpy as np

def compute_metrics(windows: np.ndarray, fs: float = 30.0) -> dict:
    """
    Рассчитывает HR, SDNN, RMSSD по PPG-сигналу с помощью HeartPy.

    Args:
        windows: np.ndarray shape [N, 320] — сигнал поокенно
        fs: частота дискретизации (default: 30 Hz)

    Returns:
        dict с ключами 'hr', 'sdnn', 'rmssd'
    """
    # Склеиваем окна в один непрерывный сигнал
    signal = windows.flatten()

    working_data, measures = hp.process(signal, sample_rate=fs)

    return {
        'hr': measures['bpm'],
        'sdnn': measures['sdnn'],
        'rmssd': measures['rmssd'],
    }


# Пример использования
pred = np.load('train/data/00d7cb23-..._pred.npy')   # [2, N, 320]
gt = np.load('train/results/00d7cb23-..._gt.npy')     # [N, 320]

solver = Solver()
signal_out = solver.solve(pred[0], pred[1])            # [N, 320]

metrics_pred = compute_metrics(signal_out)
metrics_gt = compute_metrics(gt)

# Абсолютные ошибки для одной записи
e_hr = abs(metrics_pred['hr'] - metrics_gt['hr'])
e_sdnn = abs(metrics_pred['sdnn'] - metrics_gt['sdnn'])
e_rmssd = abs(metrics_pred['rmssd'] - metrics_gt['rmssd'])

print(f"HR error:    {e_hr:.2f} уд/мин")
print(f"SDNN error:  {e_sdnn:.2f} мс")
print(f"RMSSD error: {e_rmssd:.2f} мс")

Совет: установите HeartPy локально (pip install heartpy) для тестирования своего решения на train-данных перед отправкой.


Baseline

Простейший бейзлайн — поэлементное среднее двух сигналов с учётом сдвига signal_b на 160 кадров:

import numpy as np

class Solver:
    def solve(self, signal_a: np.ndarray, signal_b: np.ndarray) -> np.ndarray:
        N = signal_a.shape[0]
        WINDOW = 320
        SHIFT = 160

        # Склеиваем окна в плоские сигналы
        flat_a = signal_a.flatten()                        # длина N*320
        flat_b = signal_b.flatten()                        # длина N*320

        # signal_a покрывает кадры [0 .. L+320), последние 320 - паддинг
        # signal_b покрывает кадры [-160 .. L+160), первые 160 и последние 160 - паддинг
        # Зона перекрытия (реальные данные): кадры [0 .. L), длина N*320-320

        overlap_len = N * WINDOW - WINDOW   # N*320 - 320
        aligned_a = flat_a[:-WINDOW]                       # отбрасываем 320 в конце
        aligned_b = flat_b[SHIFT:-SHIFT]                   # отбрасываем 160 с обоих концов

        # Среднее в зоне перекрытия
        result_mid = (aligned_a + aligned_b) / 2.0

        # Собираем: перекрытие + хвост из signal_a
        result = np.concatenate([
            result_mid,
            flat_a[-WINDOW:],
        ])

        return result.reshape(N, WINDOW)

Правила

  1. Допускаются внешние данные и предобученные модели - всё необходимое должно быть включено в архив (до 200 МБ)
  2. Код проверяется на сервере на скрытом тестовом датасете
  3. Участники могут отправлять решения 100 раз (в среднем 8 раз в сутки)
  4. Финальный результат определяется по лучшей отправке
  5. Плагиат и обмен решениями запрещены

Как начать

  1. Скачайте ( https://disk.yandex.ru/d/CfCa1RNAfQLdbQ) тренировочный датасет train/
  2. Изучите структуру данных и стыки сигналов
  3. Реализуйте класс Solver в файле solution.py
  4. Протестируйте локально на train-данных
  5. Отправьте архив с solution.py через форму

Comments

There are no comments at the moment.