Apache Airflow se ha convertido en la biblioteca de facto para la orquestación de canalizaciones en el ecosistema de Python. Ha ganado popularidad, a diferencia de soluciones similares, debido a su simplicidad y extensibilidad. En este artículo, intentaré resumir sus conceptos principales y brindarle una comprensión clara de cuándo y cómo usarlo.
¿Por qué y cuándo debo considerar Airflow?
Imagine que desea crear una canalización de aprendizaje automático que consta de varios pasos, como:
-
Leer un conjunto de datos de imágenes desde un almacenamiento basado en la nube
-
Procesar las imágenes
-
Entrena un modelo de aprendizaje profundo con las imágenes descargadas
-
Sube el modelo entrenado en la nube
-
Implementar el modelo
¿Cómo programaría y automatizaría este flujo de trabajo? trabajos cron son una solución simple pero vienen con muchos problemas. Lo que es más importante, no le permitirán escalar de manera efectiva. Por otro lado, Airflow ofrece la capacidad de programar y escalar tuberías complejas fácilmente. También le permite volver a ejecutarlos automáticamente después de una falla, administrar sus dependencias y monitorearlos usando registros y paneles.
Antes de construir la canalización antes mencionada, comprendamos los conceptos básicos de Apache Airflow.
¿Qué es el flujo de aire?
Apache Airflow es una herramienta para crear, programar y monitorear canalizaciones. Como resultado, es una solución ideal para casos de uso de ETL y MLOps. Ejemplos de casos de uso incluyen:
-
Extraer datos de muchas fuentes, agregarlos, transformarlos y almacenarlos en un almacén de datos.
-
Extraiga conocimientos de los datos y muéstrelos en un panel de análisis
-
Entrene, valide e implemente modelos de aprendizaje automático
Componentes clave
Al instalar Airflow en su edición predeterminada, verá cuatro componentes diferentes.
-
Servidor web: Webserver es la interfaz de usuario (UI) de Airflow, que le permite interactuar con él sin necesidad de una CLI o una API. Desde allí, uno puede ejecutar y monitorear tuberías, crear conexiones con sistemas externos, inspeccionar sus conjuntos de datos y mucho más.
-
Ejecutor: Los ejecutores son el mecanismo por el cual se ejecutan las canalizaciones. Hay muchos tipos diferentes que ejecutan canalizaciones localmente, en una sola máquina o de forma distribuida. Algunos ejemplos son
LocalExecutor
,SequentialExecutor
,CeleryExecutor
yKubernetesExecutor
-
programador: El planificador es responsable de ejecutar diferentes tareas en el momento correcto, volver a ejecutar canalizaciones, rellenar datos, garantizar la finalización de las tareas, etc.
-
PostgresSQL: Una base de datos donde se almacenan todos los metadatos de canalización. Esto suele ser Postgres, pero también se admiten otras bases de datos SQL.
La forma más fácil de instalar Airflow es usando docker compose
. Puede descargar el archivo de composición oficial de la ventana acoplable desde aquí:
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
Tenga en cuenta que Airflow también reside en Pipi y se puede descargar usando
pip
Conceptos básicos de Airflow
Para comenzar con Airflow, uno debe estar familiarizado con sus conceptos principales, lo que puede ser un poco complicado. Así que tratemos de desmitificarlos.
DAG
Todas las tuberías se definen como gráficos acíclicos dirigidos (DAG). Cada vez que ejecutamos un DAG, se crea una ejecución individual. Cada ejecución de DAG es independiente de otra y contiene un estado con respecto a la etapa de ejecución del DAG. Esto significa que los mismos DAG se pueden ejecutar muchas veces en paralelo.
Para instanciar un DAG, puede usar el DAG
función o con un administrador de contexto de la siguiente manera:
from airflow import DAG
with DAG(
"mlops",
default_args=,
schedule=timedelta(days=1),
start_date=datetime(2023, 1, 1)
) as dag:
El administrador de contexto acepta algunas variables globales con respecto al DAG y algunos argumentos predeterminados. Los argumentos predeterminados se pasan a todas las tareas y se pueden anular para cada tarea. La lista completa de parámetros se puede encontrar en el documentos oficiales.
En este ejemplo, definimos que el DAG comenzará el 1/1/2023 y se ejecutará todos los días. El retries
El argumento asegura que se volverá a ejecutar una vez después de un posible error.
Tareas
Cada nodo del DAG representa un Tarea, es decir, una pieza individual de código. Cada tarea puede tener algunas dependencias ascendentes y descendentes. Estas dependencias expresan cómo se relacionan las tareas entre sí y en qué orden deben ejecutarse. Cada vez que se inicializa una nueva ejecución de DAG, todas las tareas se inicializan como instancias de tareas. Esto significa que cada instancia de Tarea es una ejecución específica para la tarea dada.
Operadores
Los operadores pueden verse como plantillas para tareas predefinidas porque encapsulan el código repetitivo y abstraen gran parte de su lógica. Algunos operadores comunes son BashOperator
, PythonOperator
, MySqlOperator
, S3FileTransformOperator
. Como puede ver, los operadores lo ayudan a definir tareas que siguen un patrón específico. por ejemplo, el MySqlOperator
crea una tarea para ejecutar una consulta SQL y el BashOperator
ejecuta un script bash.
Los operadores se definen dentro del DAG
administrador de contexto como se muestra a continuación. El siguiente código crea dos tareas, una para ejecutar un comando bash y otra para ejecutar una consulta MySQL.
with DAG(
"tutorial"
) as dag:
task1 = BashOperator(
task_id="print_date",
bash_command="date",
)
task2 = MySqlOperator(
task_id="load_table",
sql="/scripts/load_table.sql"
)
Dependencias de tareas
Para formar la estructura del DAG, necesitamos definir dependencias entre cada tarea. Una forma es usar el >>
símbolo como se muestra a continuación:
task1 >> task2 >> task3
Tenga en cuenta que una tarea puede tener múltiples dependencias:
task1 >> [task2, task3]
La otra forma es a través de la set_downstream
, set_upstream
funciones:
t1.set_downstream([t2, t3])
XCom
Los XCom, o comunicaciones cruzadas, son los responsables de la comunicación entre tareas. Los objetos de XComs pueden enviar o extraer datos entre tareas. Más específicamente, envían datos a la base de datos de metadatos de donde se pueden extraer otras tareas. Es por eso que hay un límite en la cantidad de datos que se pueden pasar a través de ellos. Sin embargo, si se necesita transferir grandes cantidades de datos, se pueden utilizar almacenamientos de datos externos adecuados, como almacenamiento de objetos o bases de datos NoSQL.
Echa un vistazo al siguiente código. Las dos tareas se comunican a través de xcoms
utilizando el ti
argumento (abreviatura de instancia de tarea). El train_model
tarea es empujar el model_path
en la base de datos de metadatos, que es extraído por el deploy_model
tarea.
dag = DAG(
'mlops_dag',
)
def train_model(ti):
model_path = train_and_save_model()
ti.xcom_push(key='model_path', value=model_path)
def deploy_model(ti):
model_path = ti.xcom_pull(key='model_path', task_ids='train_model')
deploy_trained_model(model_path)
train_model_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
deploy_model_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
dag=dag
)
train_model_task >> deploy_model_task
Flujo de tareas
La API de Taskflow es una manera fácil de definir una tarea usando el decorador de Python @task
. Si toda la lógica de la tarea se puede escribir con Python, una simple anotación puede definir una nueva tarea. Taskflow gestiona automáticamente las dependencias y las comunicaciones entre otras tareas.
Usando la API de Taskflow, podemos inicializar un DAG con el @dag
decorador. Aquí hay un ejemplo:
@dag(
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
)
def mlops():
@task
def load_data():
. . .
return df
@task
def preprocessing(data):
. . .
return data
@task
def fit(data):
return None
df = load_data()
data = preprocessing(df)
model = fit(data)
dag = mlops()
Tenga en cuenta que las dependencias entre tareas están implícitas en los argumentos de cada función. Aquí tenemos un orden de encadenamiento simple, pero las cosas pueden volverse mucho más complejas. La API de flujo de tareas también resuelve el problema de comunicación entre tareas, por lo que existe una necesidad limitada de usar XComs.
Planificación
La programación de trabajos es una de las funciones principales de Airflow. Esto se puede hacer usando el schedule_interval
argumento que recibe un expresión crona datetime.timedelta
objeto, o un preajuste predefinido como @hourly
, @daily
etc. Un enfoque más flexible es utilizar el agregado recientemente horarios que te permiten definir horarios personalizados usando Python.
Aquí hay un ejemplo de cómo usar el schedule_interval
argumento. El siguiente DAG se ejecutará diariamente.
@dag(
start_date=datetime(2023,1,1),
schedule_interval = '@daily',
catchup =False
)
def my_dag():
pass
Dos conceptos muy importantes que debe comprender con respecto a la programación son la reposición y la recuperación.
Una vez que definimos un DAG, configuramos una fecha de inicio y un intervalo de programación. Si catchup=True
, Airflow creará ejecuciones de DAG para todos los intervalos de programación desde la fecha de inicio hasta la fecha actual. Si catchup=False
Airflow programará solo ejecuciones a partir de la fecha actual.
relleno amplía esta idea permitiéndonos crear ejecuciones pasadas desde la CLI independientemente del valor del parámetro catchup:
$ airflow backfill -s <START_DATE> -e <END_DATE> <DAG_NAME>
Conexiones y Ganchos
Airflow proporciona una manera fácil de configurar conexiones con sistemas o servicios externos. Las conexiones se pueden crear mediante la interfaz de usuario, como variables de entorno o mediante un archivo de configuración. Por lo general, requieren una URL, información de autenticación y una identificación única. Los ganchos son una API que abstrae la comunicación con estos sistemas externos. Por ejemplo, podemos definir una conexión PostgreSQL a través de la interfaz de usuario de la siguiente manera:
Y luego usa el PostgresHook
para establecer la conexión y ejecutar nuestras consultas:
pg_hook = PostgresHook(postgres_conn_id='postgres_custom')
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('create table _mytable (ModelID int, ModelName varchar(255)')
cursor.close()
conn.close()
Conceptos avanzados
Para que este tutorial sea lo más completo posible, debo mencionar algunos conceptos más avanzados. No entraré en muchos detalles para cada uno de ellos, pero le recomiendo que los revise, si quiere dominar Airflow.
-
Ramificación: La ramificación le permite dividir una tarea en muchas diferentes tareas ya sea para condicionar su flujo de trabajo o para dividir el procesamiento. La forma más común es
BranchPythonOperator
. -
Grupos de tareas: los grupos de tareas lo ayudan a organizar sus tareas en una sola unidad. Es una gran herramienta para simplificar la vista de su gráfico y para repetir patrones.
-
Dags dinámicos: Dags y tareas también se pueden construir de forma dinámica. Desde Airflow 2.3, se pueden crear dags y tareas en tiempo de ejecución, lo que es ideal para tareas paralelas y dependientes de la entrada. Plantillas Jinga también son compatibles con Airflow y son una adición muy útil a los dags dinámicos.
-
Pruebas unitarias y registro: Airflow tiene una funcionalidad dedicada para ejecutar pruebas unitarias y registrar información
Mejores prácticas de flujo de aire
Antes de ver un ejemplo práctico, analicemos algunas de las mejores prácticas que utilizan la mayoría de los profesionales.
-
Idempotencia: los DAG y las tareas deben ser idempotentes. Volver a ejecutar la misma ejecución de DAG con las mismas entradas siempre debería tener el mismo efecto que ejecutarlo una vez.
-
Atomicidad: las tareas deben ser atómicas. Cada tarea debe ser responsable de una sola operación e independiente de las otras tareas
-
Filtrado incremental: cada ejecución de DAG debe procesar solo un lote de datos que admitan la extracción y la carga incrementales. De esa forma, los posibles errores no afectarán a todo el conjunto de datos.
-
Código de nivel superior: se debe evitar el código de nivel superior si no es para crear operadores o dags, ya que afectará el rendimiento y el tiempo de carga. Todo el código debe estar dentro de las tareas, incluidas las importaciones, el acceso a la base de datos y los cálculos pesados.
-
Complejidad: los DAG deben mantenerse lo más simples posible porque la alta complejidad puede afectar el rendimiento o la programación
Ejemplo de una tubería Airflow
Para demostrar todos los conceptos antes mencionados, volvamos al flujo de trabajo de ejemplo mencionado al principio de este artículo. Desarrollaremos una canalización que entrene un modelo y lo implementaremos en Kubernetes. Más concretamente, el DAG constará de 5 tareas:
-
Leer imágenes de un depósito de AWS s3
-
Preprocesar las imágenes usando Pytorch
-
Ajuste un modelo ResNet con las imágenes descargadas
-
Sube el modelo en S3
-
Implemente el modelo en un clúster de Kubernetes
Tenga en cuenta que no incluiré todos los detalles específicos y el código necesario, solo las partes relacionadas con Airflow.
Primero, comencemos definiendo el DAG. Como puede ver, la canalización se ejecutará una vez al día. En caso de falla, habrá un solo reintento después de una hora. Además, no habrá recuperación a pesar de que se supone que la tubería comenzará hace dos días.
from airflow import DAG
import datetime
default_args =
dag = DAG(
'resnet_model',
default_args=default_args,
description='A simple DAG to demonstrate Airflow with PyTorch and Kubernetes',
schedule_interval='@daily',
catchup=False
)
La primera tarea se encarga de leer las imágenes de AWS S3. Para lograrlo, podemos usar el S3Hook
. Primero definimos la funcionalidad de lectura en una función y luego la correspondiente PythonOperator
. Tenga en cuenta que aquí utilizo la conexión de AWS predeterminada, pero en la mayoría de los casos, deberá definir la suya propia.
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def read_images_from_s3(**kwargs):
s3_conn = S3Hook(aws_conn_id='aws_default')
images = []
for obj in s3_conn.get_bucket('mybucket').objects.all():
images.append(obj.key)
kwargs['ti'].xcom_push(key='images', value=images)
read_images = PythonOperator(
task_id='read_images',
python_callable=read_images_from_s3,
provide_context=True,
dag=dag
)
Las siguientes en la línea son las funciones de transformación y ajuste. No los incluiré aquí en su totalidad porque en su mayoría son código Pytorch estándar.
def preprocess_images(images, **kwargs):
images = kwargs['ti'].xcom_pull(task_ids='read_images_from_s3', key='images')
kwargs['ti'].xcom_push(key='images', value=train_images)
def fit_model(preprocessed_images, **kwargs):
train_ images = kwargs['ti'].xcom_pull(task_ids=preprocess_images, key='train_images')
torch.save(model, 'trained_model.pt')
preprocess = PythonOperator(
task_id='preprocess',
python_callable=preprocess,
provide_context=True,
dag=dag
)
fit_model = PythonOperator(
task_id='fit_model',
python_callable=fit_model,
provide_context=True,
dag=dag
)
Una vez que se entrena el modelo, debemos cargarlo en S3 para poder cargarlo y atender las solicitudes. Esto se puede hacer usando el S3FileTransferOperator
que lee del sistema de archivos local y lo carga en S3.
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
upload_model = S3FileTransferOperator(
task_id='upload_model',
source_base_path='.',
source_key='trained_model.pt',
dest_s3_bucket='my-model-bucket',
dest_s3_key='trained_model.pt',
dag=dag
)
El paso final es crear un pod de Kubernetes y servir el modelo. La mejor manera de lograrlo es usando el KubernetedPodExecutor
. Suponiendo que tenemos un script de implementación que maneja la carga y el servicio del modelo (que no analizaremos aquí), podemos hacer algo de la siguiente manera:
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
deploy_model = KubernetesPodOperator(
namespace='default',
image='myimage:latest',
name='deploy-model',
task_id='deploy_model',
cmds=['python', 'deploy.py'],
arguments=[model],
is_delete_operator_pod=True,
hostnetwork=False,
dag=dag
)
El KubernetesPodOperator
utiliza la API de Kubernetes para lanzar un pod en un clúster de Kubernetes y ejecutar el script de implementación.
Una vez que hemos definido todas las tareas, simplemente necesitamos crear sus dependencias y formar el DAG. Esto es tan simple como:
read_images >> preprocess >> fit_model >> upload_model >> deploy_model
Y eso es todo. Airflow inicializará este DAG y se puede monitorear a través de la interfaz de usuario. El planificador será responsable de ejecutar las tareas en el orden correcto y en el momento adecuado.
Conclusión
Apache Airflow es una gran herramienta de ingeniería de datos en mi opinión honesta. Claro, tiene algunas deficiencias, pero también puede ser muy flexible y escalable. Si desea profundizar más, tengo dos recursos para sugerir:
-
Un curso de IBM en Coursera: ETL y canalizaciones de datos con Shell, Airflow y Kafka. Por cierto, todo el certificación en ingeniería de datos por IBM es bastante grande.
-
Ingeniería de datos con AWS Nanodegree de AWS en Udacity. El cuarto módulo en particular se centra en gran medida en Airflow.
Háganos saber si desea ver más tutoriales sobre bibliotecas de ingeniería de datos populares. Si eres nuevo en AI Summer, no olvides seguirnos en Gorjeo o LinkedInpara mantenerse actualizado con nuestros últimos artículos.
* Divulgación: tenga en cuenta que algunos de los enlaces anteriores pueden ser enlaces de afiliados y, sin costo adicional para usted, ganaremos una comisión si decide realizar una compra después de hacer clic.