shanks-university

Поток данных (Dataflow)

Обзор

Документ описывает полный жизненный цикл данных в фреймворке - от загрузки конфигурации до экспорта результатов и визуализации.

Пошаговый поток данных

Шаг 1. Инициализация (CLI)

Файл: cli.py

python3 run --config backend/runner/config/options.json

Шаг 2. Загрузка конфигурации

Файл: config/model.py

cfg = TrialConfig.load(Path("config/options.json"))

Загружаемые поля:

Поле Описание По умолчанию
verbose Уровень логирования 0
series_json Путь к JSON с рядами config/example.json
accel_json Путь к JSON с методами ускорения config/example.json
noise_json Путь к JSON с шумами config/example.json
filters_json Путь к JSON с фильтрами config/example.json
output_dir Директория вывода output
results_filename Имя файла результатов results
trial_process_count Число параллельных процессов 1
trial_task_timeout Таймаут задачи (сек) 10
trial_memory_efficient Режим экономии памяти true
precisions Типы точности [F64]
output_formats Форматы вывода [JSON, CSV]

Дополнительно загружается:


Шаг 3. Dependency Injection

Файл: dependency.py

executor = get_trial_executor_from_config(cfg)

Создаётся:

  1. TrialRunnerParallelTrialRunner или SequentialTrialRunner в зависимости от trial_process_count
  2. SeriesParamSource’ы — из PARAM_SERIES_SOURCE_BUILDERS registry:
    • JSONSeriesParamSource
    • CSVSeriesParamSource
  3. AccelParamSource’ы — из PARAM_ACCELS_SOURCE_BUILDERS registry:
    • JSONAccelParamSource — если accel_json существует
  4. SerializerTrialResultSerializer
  5. Exporters — из EXPORT_BUILDERS registry для каждого формата в output_formats

Результат: Fully configured TrialExecutor


Шаг 4. Загрузка параметров

Файл: services/trial_executor.py:load_parameters()

series_params, accel_params = executor.load_parameters(PrecisionType.F64)

Процесс:

  1. Для каждого SeriesParamSource вызывается load(precision)
  2. Для каждого AccelParamSource вызывается load(precision)
  3. Источники также загружают noise_configs

Из JSON файла загружаются:

Series параметры

{
  "series": [
    {
      "name": "CosSeries",
      "args": {"x": [1, 2]}
    }
  ]
}

Accel параметры

{
  "methods": [
    {
      "name": "LevinAlgorithm",
      "n": {"start": 10, "stop": 100, "step": 10},
      "m": [8],
      "args": {
        "remainder": ["v_type"],
        "useRecurrentFormula": true
      },
      "events": [...]
    }
  ]
}

Функция autowrap:

Результат:


Шаг 5. Генерация комбинаций

Файл: domain/complex_trial.py

trial = ComplexTrial(series_params, accel_params)
combinations = trial.combinations()

Декартово произведение:

series_params * accel_params = [
    (series_1, accel_1),
    (series_1, accel_2),
    ...
    (series_N, accel_M)
]

Каждая комбинация будет выполнена со всеми:

Результат: list[tuple[BaseSeriesParam, BaseAccelParam]]


Шаг 6. Выполнение trial

Файл: domain/use_cases/run_trial.py:execute_trial()

results = execute_trial(
    (series, accel),
    noise_config,
    filter_configs
)

Подпроцессы:

6.1 Получение ряда

series_result, series_lim = series.obtain_by_argument(series_argument, size_floor)

6.2 Применение шума (опционально)

if noise_config:
    func_name = f"applyNoise{precision.value}"
    series_result = getattr(ps, func_name)(
        series_result,
        noise_method_enum,
        noise_type_enum,
        seed, param1, param2
    )

6.3 Создание инстанса алгоритма

accel_instance = accel.create_instance(additional_args)

6.4 Итерация по n и m

for m_value in m_values:
    ctx = accel.create_event_context()
    for n_value in n_values:
        partial_sum = series_result.Sn[n_value - 1]
        accel_value = accel_instance(n_value, m_value, series_result)

        computed.append(ComputedTrialResult(...))
        events = accel.process_events(computed, ctx)

Вычисляемые метрики:

6.5 Детекция событий

Файл: domain/event.py

Типы событий:

Обработка событий:

6.6 Логика фильтрации

Если событие остановило выполнение:

  1. Определяется сегмент расходимости
  2. Применяются фильтры (savitzky_golay, kolmogorov_zurbenko)
  3. Вычисляется среднее отфильтрованных значений
  4. Результат сохраняется в filtered_results

Результат: list[TrialResult]


Шаг 7. Параллельное/последовательное выполнение

Файлы: infra/trials/parallel_runner.py, infra/trials/sequential_runner.py

for result_chunk in runner.run(combinations):
    results.extend(result_chunk)

SequentialTrialRunner:

ParallelTrialRunner:

Результат: Iterator[list[TrialResult]]


Шаг 8. Сериализация результатов

Файл: infra/export/serializer.py

dicts = serializer.to_dict(results)

Преобразование TrialResult -> dict:


Шаг 9. Экспорт результатов

Файлы: infra/export/*.py

for exporter in exporters:
    exporter.export(dicts, config, series)

JSON Exporter

Файл: json_exporter.py

# Сохраняет в: output/results.json
json.dump(dicts, file, indent=2)

CSV Exporter

Файл: csv_exporter.py

# Сохраняет в: output/results.csv
df = pd.DataFrame(dicts)
df.to_csv(path, index=False)

Шаг 10. Визуализация (Frontend)

Web UI

  1. React UI -> GET /api/results
  2. FastAPI -> MongoDB с фильтрацией
  3. MongoDB -> возвращает документы
  4. FastAPI -> ResultDocument Pydantic модели
  5. React UI -> отображает таблицы и графики

Vizr

PYTHONPATH="." python3 -m vizr.main ../runner/output/results.json
  1. Загружает отфильтрованные JSON файл через Polars
  2. Строит графики сходимости через PyQtGraph

Данные и их трансформация

Входные данные

Источник Формат Содержимое
options.json JSON Конфигурация выполнения
example.json JSON Ряды, методы, шумы, фильтры
example_series.csv CSV Натуральные ряды

Промежуточные данные

Стадия Тип Описание
Параметры BaseSeriesParam, BaseAccelParam Загруженные параметры
Комбинации tuple[Series, Accel] Декартово произведение
Trial result TrialResult Результат одного trial
Сериализованные dict JSON-совместимые словари

Выходные данные

Формат Расположение Использование
JSON output/results.json Анализ, интеграция
CSV output/results.csv Excel, pandas
Parquet output/results.parquet Vizr, big data
MongoDB База данных Web UI

Схема данных TrialResult

TrialResult
├── SeriesTrialResult
│   ├── id: str
│   ├── name: str
│   ├── lim: float | None
│   └── arguments: dict
├── AccelTrialResult
│   ├── name: str
│   ├── m_value: int
│   └── additional_args: dict
├── computed: list[ComputedTrialResult]
│   ├── n: int
│   ├── series_value: float
│   ├── partial_sum: float
│   ├── partial_sum_deviation: float
│   ├── accel_value: float
│   ├── accel_value_deviation: float
│   └── events: list[Event]
├── noise: NoiseConfig | None
├── error: ErrorTrialResult | NoErrorTrialResult
└── filtered: FilteredResults | None
    ├── start_n: int
    ├── segment_length: int
    └── methods: dict[str, FilterMethodResult]