Ques/Help/Req Из цикла ETL: настройка первого DAG

XakeR

Member
Регистрация
13.05.2006
Сообщения
1 912
Реакции
0
Баллы
16
Местоположение
Ukraine
На обложке: Воздушный поток (air flow) вокруг самолета, источник: fineartamerica.com

В предыдущих статьях я разобрала, как наладить поток логов из конструктора Dialogflow в BigQuery, и теперь почва для построения отчётов построена. Сегодня покажу, как настроить один DAG – исполняемую по расписанию группу команд в Airflow. Я исхожу из предпосылки, что вы уже развернули программу на своем сервере / устройстве и имеете доступ к UI. Эта статья подходит тем, кто решил автоматизировать в своей работе то, что достаточно отлажено, чтобы поддаться автоматизации. В моем случае это скрипты, рассчитывающие ежемесячную эффективность одного из ботов по конечным состояниям диалогов.

Это статья из цикла «5 ETL для зоопарка ботов». В нём я пошагово разбираю, как наладить потоки данных из разных библиотек и конструкторов ботов на разных языках и стеках. В основе лежат Python и его библиотеки. Вот предыдущие статьи цикла, подводящие к текущей:

  1. Анонс цикла с перечнем технологий
  2. Настройка потока логов «Из Dialogflow в BigQuery»
  3. Python для аналитики ad hoc из BigQuery
  4. Развертывание Airflow

Что такое DAG​


DAG (Directed Acyclic Graph – направленный ациклический граф) – просто коллекция различных задач. Вероятно, вы уже сталкивались с термином «граф», если проходили курсы по машинному обучению. В контексте Airflow же слово имеет примерно такое же значение – это поток задач, который может разветвляться:

Граф Airflow (DAG)0


Потому модуль DAG импортируется всегда, чтобы связать различные задачи воедино.

Вернемся к настройке​


Для начала я импортирую необходимые библиотеки:

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.telegram.operators.telegram import TelegramOperator from airflow.utils.trigger_rule import TriggerRule

BashOperator нужен, чтобы запускать силами bash-скрипты, лежащие по соседству. Классы datetime, timedelta модуля datetime нужны, чтобы описать поведение графа при ошибке. TriggerRule понадобится, чтобы уведомлять меня в Telegram только при ошибке.

Подключение бота Telegram​


Почти все рабочие чаты у меня живут в Telegram, потому удобно получать уведомления об исполнении в специальный чат, где живут Airflow-бот и его братья по другим проектам. Поэтому я импортировала TelegramOperator.

Если вы не знаете, как создать бота в Telegram, то используйте раздел «Создание бота» из вот этого гайда. Кратко: передав команду /start боту @BotFather, мы следуем инструкции, называем бота, задаем ему идентификатор для поиска и в конце получаем токен:

Из цикла ETL: настройка первого DAG1


Скопировав токен, создаём функцию, уведомляющую об исполнении задач через мессенджер:

send_telegram_message = TelegramOperator( task_id=»send_telegram_message», token=»<Токен бота в Telegram>», chat_id=»Идентификатор чата, куда добавлен бот», text=»Расчет сводки выполнен.» )

Чтобы получить chat_id, используйте бота @RawDataBot. Токен бота оставляем в одноименном поле.

Граф​


Теперь создаём DAG и описываем некоторые обязательные характеристики:

with DAG( «daily_effectiveness», # Уникальное имя графа, отобразится в консоли default_args={ «depends_on_past»: False, # Зависимость задач от предыдущих «retries»: 1, # Число перепопыток в случае неудаче «retry_delay»: timedelta(seconds=30) # Интервал между попытками }, description=»Ежемесячная сводка маркетплейса», Описание, появится в консоли при наведении на название DAG’а schedule_interval=’@monthly’, # Ежемесячное исполнение start_date=datetime(2023, 7, 1), # Когда начать исполнение по расписанию catchup=False, tags=[«Маркетплейс», «Dialogflow», «BigQuery»], ) as dag: Ключевая особенность Airflow заключается в том, что запуск DAG по расписанию – нечто вроде копирования группы задач для каждого нужного времени. Планировщик (scheduler – его мы запускали в прошлой статье) проверяет, не работает ли до сих пор предыдущая копия графа. Эта концепция называется catchup (можно перевести как «подхват»).

Задачи​


Теперь создаём задачи (tasks) – отдельные исполняемые команды в рамках графа. В моём случае для расчёта сводки эффективности необходимо предварительно войти в виртуальную среду airflow_env, и только потом исполнять подпрограмму dialogflow-to-bigquery.py силами bash:

t1 = BashOperator( task_id=»entering_virtual_environment», # Идентификатор таски для отслеживания в консоли bash_command=»source /home/fitwist/airflow/airflow_env/bin/activate», retries=2 # ) t2 = BashOperator( task_id=»calculating_marketplace_effectiveness», depends_on_past=False, bash_command=»python3 /home/fitwist/airflow/df-to-looker/dialogflow-to-bigquery.py», retries=2 )

Задаем последовательность задач:

t1 >> t2 >> t3 >> t4 >> TelegramOperator( task_id=»send_telegram_message», token=»6191376785:AAHf9thqeAuaIjou3DjzEFiI06bYY9FDKlI», chat_id=»-1001200247335″, trigger_rule=TriggerRule.ONE_FAILED, text=»Группа чат-бота: одна из ежедневных выгрузок не выполнена. Проверь логи (Recent Tasks / Failed / Task Id / Log).» )

И вуаля! Ваш первый DAG готов! Когда у меня впервые получилось запустить Airflow, чувство было, словно в космос полетела, ведь такая технология в резюме очень котируется у дата-аналитиков.

Data Scientist Открытие, Москва, можно удалённо, По итогам собеседования tproger.ru Вакансии на tproger.ru

Полностью DAG будет выглядеть так:

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.telegram.operators.telegram import TelegramOperator send_telegram_message = TelegramOperator( task_id=»send_telegram_message», token=»<Токен бота в Telegram>», chat_id=»Идентификатор чата, куда добавлен бот», text=»Расчет сводки выполнен.» ) with DAG( «daily_effectiveness», # Идентификатор, отобразится в консоли default_args={ «depends_on_past»: False, # Зависимость задач от предыдущих «retries»: 1, # Число перепопыток в случае неудаче «retry_delay»: timedelta(seconds=30) # Интервал между попытками }, description=»Ежемесячная сводка маркетплейса», Описание, появится в консоли при наведении на название DAG schedule_interval=’@monthly’, # Ежемесячное исполнение start_date=datetime(2023, 7, 1), # Когда начать исполнение по расписанию catchup=False, tags=[«Маркетплейс», «Dialogflow», «BigQuery»], ) as dag: t1 = BashOperator( task_id=»entering_virtual_environment», # Идентификатор таски для отслеживания в консоли bash_command=»source /home/fitwist/airflow/airflow_env/bin/activate», retries=2 # ) t2 = BashOperator( task_id=»calculating_marketplace_effectiveness», depends_on_past=False, bash_command=»python3 /home/fitwist/airflow/df-to-looker/dialogflow-to-bigquery.py», retries=2 ) t1 >> t2 >> t3 >> t4 >> TelegramOperator( task_id=»send_telegram_message», token=»6191376785:AAHf9thqeAuaIjou3DjzEFiI06bYY9FDKlI», chat_id=»-1001200247335″, trigger_rule=TriggerRule.ONE_FAILED, text=»Группа чат-бота: одна из ежедневных выгрузок не выполнена. Проверь логи (Recent Tasks / Failed / Task Id / Log).» )

Можно даже получать уведомления об ошибке любым удобным способом. Мне удобно в Telegram, потому использую триггер ONE_FAILED, то есть “когда хотя бы одна задача не выполнена”.

Этот и другие сниппеты из цикла статей можно найти в моем репозитории на GitHub.

Заключение​


Airflow – это настоящий швейцарский нож. В нём, как в Python, написана утилита почти под любую потребность. И на покрытие тредами Stack Overflow грех жаловаться.

Швейцарский нож мем2


Когда я только делала первые шаги в направлении автоматизации, то существенную моральную помощь также оказывала прекрасная документация: там и примеры, и грамотно дозированные страницы. Даже база графов на любой вкус на GitHub. Так и хочется создателям за неё выдать награду.

Это уникальный в своем роде опенсорсный продукт, который позволяет даже новичкам предъявить полезнейший функционал – автоматизацию чего угодно. Он не требует знания теории вероятности и основ машинного обучения, однако при наличии Airflow в вашем резюме позволяет требовать 100К+.

В следующей статье мы посмотрим, как выглядит сводка эффективности, которую мы теперь можем визуализировать силами Google Looker (экс-Data Studio).
 
198 157Темы
635 128Сообщения
3 618 411Пользователи
Semifistokl22Новый пользователь
Верх