Пошаговое руководство по созданию и запуску первого DAG в Apache Airflow с примерами кода, проверками и распространёнными ошибками. Время выполнения — около 60–120 минут в зависимости от окружения.
0
Статья была полезной?
Комментарии (0)
Войдите или зарегистрируйтесь, чтобы оставить комментарий
Загрузка комментариев…
Что вы изучите
Установка и запуск Apache Airflow 2.7 (релиз 2025) в Docker-контейнере.
Создание первого DAG: планирование, зависимости и логирование.
Использование основных операторов и sensors: BashOperator, PythonOperator, HttpSensor.
Как делать backfill, тестировать и деплоить DAG в продакшен.
Сравнение Airflow и Prefect по задачам оркестрации рабочих процессов.
Типовые ошибки и способы их устранения при разработке DAG.
Требования
ОС: Ubuntu 22.04 LTS или CentOS 8/Stream; другие дистрибутивы подойдёт, если настроен Docker.
Docker 24.0 (релиз 2025) и docker-compose 2.18 (релиз 2025); минимум 4 CPU, 8 GB RAM рекомендовано для локальной разработки.
Python 3.12 (релиз 2025) для локального тестирования DAG; systemd не обязателен.
PostgreSQL 15 (релиз 2025) для метаданных Airflow; образ postgres:15 имеет размер ~320 MB.
Свободное место на диске: минимум 5 GB для образов и логов; образ apache/airflow:2.7.0-python3.12 ≈ 1.1 GB.
Зачем Airflow?
Airflow — это планировщик рабочих процессов (workflow orchestrator), который позволяет описывать ETL-пайплайны и периодические задачи как DAG (Directed Acyclic Graph). На практике вы получите прозрачное управление зависимостями задач, повторные попытки, логирование и визуализацию выполнения. Примерное время на подготовку окружения и написание первого DAG — 60–120 минут.
Скриншот веб-интерфейса Apache Airflow с DAG-списком
Шаг 1: первый DAG
Цель шага: создать минимальный рабочий DAG, загрузить его в Airflow и убедиться, что планировщик запускает задачи. Мы используем Docker-Compose с Airflow 2.7 (релиз 2025) и Postgres 15 (релиз 2025).
Команды (создайте директорию проекта и docker-compose.yml):
Объяснение: файл поднимает Postgres и webserver Airflow в LocalExecutor. Порт веб-интерфейса — 8080. Образ airflow ≈ 1.1 GB; время загрузки на типичном канале 100 Mbps — около 2–4 минут.
Запуск и инициализация БД:
docker compose up -d
# инициализация метаданных
docker compose exec airflow-webserver airflow db upgrade
Ожидаемый вывод:
Applying alembic migrations
... (snip) ...
INFO sqlalchemy.engine.Engine SELECT version_num FROM alembic_version
INFO alembic.runtime.migration upgrade -> head
Типовая ошибка и фикс:
Ошибка: "psycopg2.OperationalError: could not connect to server: Connection refused" — означает, что контейнер Postgres ещё не готов. Фикс: подождите 10–20 секунд и повторите, либо используйте healthcheck и depends_on с condition (docker compose v2 поддерживает healthcheck). Пример команды ожидания:
until docker compose exec postgres pg_isready -U airflow; do sleep 2; done
Создайте DAG-файл: dags/example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'you',
'depends_on_past': False,
'email_on_failure': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'example_dag',
default_args=default_args,
description='Первый DAG для туториала',
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
t1 = BashOperator(task_id='print_date', bash_command='date')
t2 = BashOperator(task_id='sleep', bash_command='sleep 5')
t1 >> t2
Ожидаемый вывод в веб-интерфейсе: DAG "example_dag" появится в списке, статус — recently changed, а задачи можно запустить вручную. Вывод логов задачи print_date будет содержать строку с текущей датой как в Unix.
Потенциальная ошибка и фикс при загрузке DAG: синтаксическая ошибка Python приводит к статусу DAG как ImportError в UI. Фикс: проверить синтаксис локально python -m py_compile dags/example_dag.py и посмотреть логи webserver:
docker compose logs airflow-webserver
# ищите Traceback и исправляйте по строкам
Скриншот кода DAG и логов Airflow в Web UI
Шаг 2: операторы и sensors
Цель шага: применить основные операторы и Sensor, показать их поведение, параметры retries и зависимостей. Примеры для Airflow 2.7 (релиз 2025).
Создадим DAG со следующими компонентами: BashOperator, PythonOperator, HttpSensor.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.http_sensor import HttpSensor
from datetime import datetime, timedelta
def task_func(**kwargs):
print('Hello from PythonOperator')
with DAG('operators_sensors_dag', start_date=datetime(2025,1,1), schedule_interval='@hourly', catchup=False) as dag:
wait_for_api = HttpSensor(
task_id='wait_for_api',
http_conn_id=None,
endpoint='https://httpbin.org/status/200',
poke_interval=10,
timeout=60,
)
py = PythonOperator(task_id='run_py', python_callable=task_func)
bash = BashOperator(task_id='run_bash', bash_command='echo "done"')
wait_for_api >> py >> bash
Объяснение: HttpSensor пытается опросить endpoint каждые 10 секунд, timeout 60 секунд. Если endpoint вернёт 200 — Sensor переходит в состояние success и запускает следующий оператор. Используйте poke_interval и timeout для контроля потребления ресурсов.
Ожидаемый вывод (логи Sensor):
[2025-06-01 12:00:00] INFO - Poking: https://httpbin.org/status/200
[2025-06-01 12:00:00] INFO - Got 200
[2025-06-01 12:00:00] INFO - Success criteria met. Exiting.
Типичная ошибка и её исправление:
Ошибка: airflow.exceptions.AirflowException: Connection id 'http_default' not found. Причина: отсутствует connection в Airflow. Фикс: создать connection в UI или через CLI:
Пример ошибки оператора Python: если функция пробрасывает исключение, задача перейдёт в state failed. Логи покажут traceback. Фикс: добавить обработку исключений и параметр retries в default_args.
Шаг 3: backfill
Цель шага: показать, как выполнять backfill для пропущенных интервалов и как безопасно запускать повторно задачи. Backfill полезен при ретроспективной загрузке данных.
Команда для backfill в Airflow 2.7:
# Пример: запустить backfill для example_dag с 2025-05-20 по 2025-05-22
docker compose exec airflow-webserver airflow dags backfill example_dag -s 2025-05-20 -e 2025-05-22
Объяснение: команда создаст DagRun для каждого дня в указанном интервале и выполнит задачи согласно зависимостям. По умолчанию backfill уважает catchup и зависимость от прошлых запусков. Для ускорения и параллельного выполнения можно задать параметр -n (параллелизм) и --pool.
Ожидаемый вывод:
[2025-06-01 12:10:00] INFO - Creating 3 dag runs
[2025-06-01 12:10:00] INFO - TaskInstance: example_dag.print_date for 2025-05-20 queued
[2025-06-01 12:10:01] INFO - TaskInstance: example_dag.print_date for 2025-05-20 succeeded
... (повтор для остальных дат)
Типичная ошибка и её фикс:
Ошибка: ValueError: "start_date" must be a datetime в DAG: означает, что в DAG указан некорректный формат даты. Фикс: использовать datetime(YYYY, M, D) или строки, приведённые к datetime. Ещё частая проблема — overlapping DagRuns при использовании SequentialExecutor при backfill; используйте LocalExecutor или CeleryExecutor для параллельных запусков.
Шаг 4: тестирование DAG
Цель шага: показать локальные методы тестирования задач и DAG без поднятия всего окружения. Используем команды CLI Airflow и unit-тесты на Python.
Тест запуска task-а локально (команда CLI):
# Запустить task instance для конкретной даты без webserver
docker compose exec airflow-webserver airflow tasks test example_dag print_date 2025-06-01
Ожидаемый вывод:
[2025-06-01 12:20:00] INFO - Running: ['bash', '-lc', 'date']
Thu Jun 1 12:20:00 UTC 2025
[2025-06-01 12:20:00] INFO - Marking task as SUCCESS
Потенциальная ошибка и фикс:
Ошибка: ModuleNotFoundError при импорте helper-модулей в dags. Фикс: убедитесь, что PYTHONPATH настроен, или используйте plugins директорию Airflow (/opt/airflow/plugins) для общих утилит. Для unit-тестов используйте pytest и мокируйте Airflow-модули.
Совет по времени: команда airflow tasks test выполняется мгновенно для простых задач; для интеграционных тестов выделяйте 5–30 минут на прогон.
Шаг 5: деплой в прод
Цель шага: подготовить инструкции для безопасного деплоя DAG в production с использованием Git, CI/CD (GitHub Actions) и минимальными ресурсами: 4 CPU и 16 GB RAM рекомендуются для экземпляра с CeleryExecutor.
name: Deploy DAG
on:
push:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install deps
run: pip install -r requirements.txt
- name: Run tests
run: pytest -q
deploy:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Push DAG to server
run: |
rsync -avz --delete dags/ deploy@prod:/opt/airflow/dags/
Ожидаемый результат: при пуше в ветку main тесты проходят, и DAG-файлы синхронизируются на prod-сервер. На сервере systemd может управлять docker compose: unit-файл запускает docker compose up -d. Для zero-downtime деплоя используйте rolling-restart Celery workers и проведение миграций базы данных заранее.
Типичная ошибка и её фикc:
Ошибка: конфликт версий пакетов (например, библиотека для HTTP-клиента). Фикс: фиксировать версии в requirements.txt, использовать изолированные образы docker с тестовой сборкой и прогонять интеграционные тесты на staging перед production.
Что лучше Prefect?
Prefect — альтернативный orchestrator с упором на простоту и современный API. Prefect 3 (релиз 2025) делает акцент на SaaS-интеграции и динамические потоки. Основные отличия:
Airflow ориентирован на расписания и историчные DAG; сильный UI для отслеживания DAG-run и интеграций со стандартными SQL-бэкендами.
Prefect предоставляет более гибкую модель Flow, удобен для event-driven и ad-hoc запусков; у Prefect встроен modern state handling и удобный локальный runtime.
Когда выбирать что: если нужна зрелая экосистема с множеством интеграций, RBAC и контролем на уровне оператора — выбирайте Airflow. Если важна быстрая разработка, простая интеграция с cloud SaaS и меньше операций по настройке — рассматривайте Prefect. Многие команды используют оба инструмента: Airflow для регулярных ETL и Prefect для ad-hoc pipelines и data-science задач.
Какие частые ошибки?
Ниже перечислены распространённые ошибки при работе с Airflow и способы их устранения.
Почему DAG не виден в web UI?
Причины: синтаксическая ошибка в файле, некорректные права на файлы, или webserver не видит директорию dags. Проверяйте логи webserver и выполняйте python -m py_compile dags/your_dag.py. Убедитесь, что владелец файлов и права позволяют контейнеру читать файлы (обычно 644). Если используете NFS — проверьте latency и consistency, так как Airflow активно импортирует файлы.
Что делать при "Database connection error"?
Проверьте строку подключения AIRFLOW__CORE__SQL_ALCHEMY_CONN, доступность Postgres и правильность учётных данных. Для проверки внутри контейнера используйте psql или pg_isready. В случаях network policy или firewall исправьте правила. Если вы видите ошибки миграции, выполните airflow db upgrade и посмотрите логи alembic.
Как уменьшить время запуска задач?
Оптимизируйте время старта путем уменьшения overhead: используйте CeleryExecutor или KubernetesExecutor, уменьшите DAG-parse-time, отключите тяжелые импорты на верхнем уровне файлы DAG (отложите import внутрь PythonOperator), настроите оптимальный min_worker_concurrency. Применяйте pools и лимит на параллелизм в конфиге (по умолчанию 32). Также следите за размерами логов и настройкой storage для логов (S3/MinIO ускоряет запись логов при большом потоке).
Когда задачи повторно отмечаются как running?
Часто это связано с проблемами брокера или scheduler: потеря heartbeats у worker'ов, или проблемы с базой метаданных. Проверьте время heartbeats в worker и scheduler, увеличьте SQLAlchemy pool size при большой нагрузке и убедитесь, что системное время на всех узлах синхронизировано через NTP. Для Celery проверяйте Redis/Postgres health.
Частые вопросы
Как начать миграцию с LocalExecutor на CeleryExecutor?
Для миграции подготовьте брокер сообщений (Redis или RabbitMQ) и backend (Postgres/Redis). В конфиге Airflow поменяйте параметр executor на CeleryExecutor и настройте section [celery] с URL брокера. Тестируйте на staging: поднять несколько worker-контейнеров, прогнать нагрузочный backfill и наблюдать за состояниями задач. Обратите внимание на connection pool к базе данных; при высокой нагрузке увеличьте pool_size. Планируйте миграцию на окно с минимальной активностью, выполняйте миграции alembic заранее и мониторьте логи.
Сколько памяти требуется для Webserver и Scheduler?
Для локальной разработки достаточно 2 CPU и 4 GB RAM для webserver и scheduler. В продакшене рекомендуется 2–4 CPU и 8–16 GB RAM для scheduler при большом количестве DAG-run. Worker'ы Celery должны иметь минимум 1–2 CPU и 4–8 GB RAM в зависимости от конкретных задач (например, heavy ETL потребует больше памяти). Настройка числа worker'ов и concurrency влияет на общий потребление ресурсов.
Где хранить конфигурацию connections и variables безопасно?
Рекомендуется хранить secrets в секретном менеджере: HashiCorp Vault, AWS Secrets Manager или Kubernetes Secrets. Airflow поддерживает интеграцию через connections backend и secrets backend. Для CI/CD храните только ссылки на секреты, а не сами значения в репозитории. Для локальной разработки используйте .env и .airflowignore аккуратно, чтобы не коммитить реальные пароли.
Почему оператор зависает в state queued?
Обычно причина — недоступность worker'ов или исчерпан лимит parallelism/pool. Проверьте количество активных worker'ов, их логи и состояние брокера. В настройках Airflow — core.parallelism, dag_concurrency и max_active_tasks — могут ограничивать запуск. Для Celery также проверьте Celery worker concurrency и availability. Увеличьте лимиты или уменьшите нагрузку на систему для разрешения очереди.
Чем логировать большие объёмы данных в задачах?
Для больших объёмов логов не храните их в базе данных. Настройте remote logging: S3, GCS или MinIO. В Airflow укажите backend в конфиге (например, remote_base_log_folder) и задайте IAM-учётные данные. Это уменьшит нагрузку на файловую систему и обеспечит масштабирование при большом потоке задач и объёмах логов.
Полезные материалы по теме доступны в разделах DevOps и Python на сайте. Для примеров CI/CD посмотрите публикации в DevOps и обзоры интеграций с Redis и Postgres в статьях категории Python.
Комментарии (0)
Войдите или зарегистрируйтесь, чтобы оставить комментарий
Загрузка комментариев…