Un workflow –flujo de trabajo– es una secuencia orquestada de pasos que conforman un proceso de negocio. Los workflows ayudan a definir, implementar y automatizar estos procesos de negocio, mejorando la eficiencia y la sincronización entre sus componentes. Un workflow ETL consiste en recoger datos de distintas fuentes, procesarlos y extraer valor de ellos, almacenando los resultados en un data warehouse, para que después puedan ser consultados por terceros. Los procesos ETL ofrecen una ventaja competitiva a las compañías que los usan, ya que facilitan la recolección de datos y su posterior almacenamiento, análisis y explotación, con el fin de mejorar la inteligencia de negocio.
Apache Airflow es una herramienta open source para diseñar, planificar y monitorizar workflows. Desarrollada en 2014 por Airbnb, y posteriormente liberada como código abierto, Airflow se ha convertido en una solución muy popular, con más de 16 000 estrellas en su repositorio de GitHub. Es una solución escalable, flexible, extensible y elegante, donde los flujos de trabajo se diseñan en Python, y se monitorizan, planifican y gestionan con una interfaz web. Airflow puede integrarse fácilmente con fuentes de datos como APIs HTTP, bases de datos (MySQL, SQLite, Postgres…) y mucho más. Si quieres aprender más sobre todo lo que puedes hacer con esta herramienta, echa un vistazo a este tutorial (en castellano) o a este otro (en inglés).
A pesar de ser una gran herramienta, hay algunas cosas de Airflow que no nos gustan mucho:
- Por defecto, Airflow utiliza una base de datos SQLite como backend, ofreciendo un bajo rendimiento
- Datos sensibles como credenciales se almacenan en la base de datos como texto plano, sin encriptar
- XCom, la cola de mensajes de Airflow, tiene un límite de tamaño de 65KB, lo cual puede ser un problema cuando se trabaja con grandes cantidades de datos
En este artículo, aprenderemos a crear nuestra propia imagen de Airflow en Docker, y a utilizar Docker Compose para automatizar el despliegue de esta herramienta junto con un backend MySQL, a fin de mejorar el rendimiento. También implementaremos un sistema criptográfico para almacenar las credenciales de forma segura, y aumentaremos el límite de tamaño de XCom a 4GB.
Spoiler: si quieres ir directo al grano y montar todo sin seguir este tutorial, al final tienes un enlace a un repositorio en GitHub con todos los archivos necesarios.
¡Manos a la obra!
En primer lugar, empezaremos por crear una imagen de Airflow en Docker. Podríamos usar la imagen oficial en DockerHub, pero creando la nuestra propia aprenderemos a instalar Airflow en cualquier entorno. A partir de la imagen oficial de Python 3.7 (la 3.8 presenta algunos problemas de compatibilidad con Airflow), instalaremos esta herramienta con el administrador de paquetes pip y haremos la configuración inicial básica. Nuestro Dockerfile se vería así:
FROM python:3.7 RUN pip3 install 'apache-airflow' RUN airflow initdb CMD (airflow scheduler &) && airflow webserver
Si quisiésemos desplegar ahora mismo un contenedor, escribiríamos las siguientes dos líneas en un terminal, creando la imagen desde el Dockerfile y, a continuación, ejecutando un contenedor con dicha imagen, mapeando el puerto 8080 y creando un volumen para persistir los datos de Airflow:
docker build . -t airflow docker run -it -p 8080:8080 -v :/root/airflow airflow
Sin embargo, como vimos antes, en este caso Airflow está utilizando una base de datos SQLite como backend, cuyo rendimiento es bastante menor que si utilizásemos, por ejemplo, un servidor MySQL. De nuevo, mediante Docker podemos desplegar un contenedor MySQL con una base de datos llamada «airflowdb» y un usuario con permisos completos en esa base de datos.
docker run -d -p 3306:3306 -v :/var/lib/mysql --env-file mysql.env mysql:latest
Junto con el archivo mysql.env en el que se definen el nombre de la base de datos, el usuario y la contraseña (puedes cambiarlos por lo que tú quieras):
MYSQL_ROOT_PASSWORD=sOmErAnDoMsTuFF MYSQL_DATABASE=airflowdb MYSQL_USER=airflower MYSQL_PASSWORD=eirfloub!*
Llegados a este punto, tiene sentido usar Docker Compose para orquestar el despliegue de estos dos contenedores. El siguiente archivo docker-compose.yml desplegará ambos contenedores y los interconectará con una red bridge llamada airflow-backend. También mapea los volúmenes necesarios para ambos contenedores, abre el puerto 8080 para Airflow, y en MySQL establece «mysql_native_password» como plugin de autenticación, ya que el «caching_sha2_password» utilizado por defecto no se lleva bien con Apache Airflow:
version: "3.7" services: airflow-backend: image: mysql:latest command: --default-authentication-plugin=mysql_native_password networks: - airflow-backend volumes: - "airflow-backend:/var/lib/mysql" env_file: - mysql.env airflow-engine: build: ./airflow-engine depends_on: - airflow-backend init: true networks: - airflow-backend volumes: - "airflow-engine:/root/airflow/" ports: - "8080:8080" env_file: - mysql.env networks: airflow-backend: volumes: airflow-engine: airflow-backend:
Además de este archivo, tenemos que crear dos carpetas: airflow-engine, con el Dockerfile de Airflow, y airflow-backend, con el Dockerfile para la base de datos y el archivo airflow.sql. Aun así, queda algo de trabajo por hacer… ¿No sería maravilloso que la conexión con el contenedor de MySQL se estableciera automáticamente? Pues no te preocupes, porque es muy sencillo. Airflow tiene un archivo llamado airflow.cfg donde almacena configuraciones clave-valor, incluyendo la URL del backend. En este caso, el nombre del contenedor de MySQL es airflow-backend, y la URL completa de la base de datos es mysql://airflower:eirfloub!*@airflow-backend/airflowdb (si has usado los mismos nombres que en este tutorial). Estos archivos clave-valor pueden ser fácilmente modificados con la librería configparser de Python. El siguiente script de Python, mysqlconnect.py, en la carpeta airflow-engine, hará el trabajo por nosotros:
from os import environ as env import configparser # Open the Airflow config file config = configparser.ConfigParser() config.read('/root/airflow/airflow.cfg') # Store the URL of the MySQL database config['core']['sql_alchemy_conn'] = 'mysql://{user}:{password}@airflow-backend/{db}'.format(user=env.get('MYSQL_USER'), password=env.get('MYSQL_PASSWORD'), db=env.get('MYSQL_DATABASE')) config['core']['executor'] = 'LocalExecutor' with open('/root/airflow/airflow.cfg', 'w') as f: config.write(f)
Con este cambio, el contenedor MySQL quedaría configurado como backend, pero las credenciales seguirían almacenándose como texto plano. Para arreglarlo, modificaremos de nuevo el archivo airflow.cfg con otro script de Python, llamado fernet.py, que generará una clave Fernet aleatoria, con la que Airflow encriptará todos los datos sensibles:
from cryptography.fernet import Fernet import configparser # Generate a random Fernet key fernet_key = Fernet.generate_key().decode() # Store the key config = configparser.ConfigParser() config.read(‘/root/airflow/airflow.cfg’) config[‘core’][‘fernet_key’] = fernet_key with open(‘/root/airflow/airflow.cfg’, ‘w’) as f: config.write(f)
La pregunta ahora es, ¿cómo ejecutamos estos scripts? ¡Bash al rescate! Escribiremos un script en Bash que se ejecutará cada vez que se lance el contenedor de Airflow. Se llamará airflow.sh y se guardará en la carpeta airflow-engine. Este script Bash comprobará si es la primera vez que se ejecuta el contenedor; si es así, hará la configuración inicial de Airflow y llamará a los dos scripts Python anteriores. A continuación, ejecutará el scheduler y el servidor web de Airflow.
INIT_FILE=.airflowinitialized if [ ! -f «$INIT_FILE» ]; then # Create all Airflow configuration files airflow initdb rm /root/airflow/airflow.db # Secure the storage of connections’ passwords python fernet.py # Wait until the DB is ready apt update && apt install -y netcat while ! nc -z airflow-backend 3306; do sleep 1 done apt remove -y netcat # Setup the DB python mysqlconnect.py airflow initdb # This configuration is done only the first time touch «$INIT_FILE» fi # Run the Airflow webserver and scheduler airflow scheduler & airflow webserver & wait
¡Casi listo! Todavía hay algo que podríamos mejorar… Por defecto, XCom, el sistema de colas de mensajes de Airflow, tiene un límite de 65KB para los objetos, lo cual puede ser insuficiente cuando se trabaja con grandes cantidades de datos. Esto sucede porque XCom funciona sobre el backend MySQL, donde el valor de cada objeto es modelado por una columna de tipo BLOB. Cambiando el tipo de columna a LONGBLOB, el límite se incrementará hasta los 4GB. Esta modificación puede ser automatizada dentro del script Bash anterior, introduciendo estas líneas antes del “touch “$INIT_FILE””:
# Allow XComs to store objects bigger than 65KB apt update && apt install -y default-mysql-client mysql –host airflow-backend –user=root –password=MYSQL_DATABASE –execute=»ALTER TABLE xcom MODIFY value LONGBLOB;» apt remove -y default-mysql-client
Si has seguido todos los pasos, deberías tener los siguientes ficheros:
airflow-engine: - airflow.sh - Dockerfile - fernet.py - mysqlconnect.py docker-compose.yml mysql.env
Solo queda un último paso, que es modificar el archivo airflow-engine/Dockerfile para que ejecute los scripts de Bash y Python que preparamos en los pasos anteriores:
# Airflow seems to crash with Python 3.8, it's important to use version 3.7 instead FROM python:3.7 # Install and setup Airflow RUN pip3 install ‘apache-airflow[mysql,crypto]’ mysql-connector-python # Configure Airflow: connect to backend WORKDIR /root/airflow/ COPY airflow.sh airflow.sh RUN chmod +x airflow.sh COPY fernet.py fernet.py COPY mysqlconnect.py mysqlconnect.py CMD ./airflow.sh
¡Ya está todo listo para desplegar tu Apache Airflow revitalizado! Abre una terminal dentro de la carpeta raíz en la que has ido creando todos los archivos del tutorial y escribe:
docker-compose build docker-compose up
Ahora ya estás listo para empezar a usar esta potente herramienta y orquestar workflows. Si no has sido capaz de seguir todos los pasos en el tutorial, no te preocupes, tienes todo en este repositorio de GitHub.
Autor: Rafael P. Martínez Álvarez, Responsable Técnico de Big Data Analytics y Guillermo Barreiro Fernández, Becario del Área de Sistemas Inteligentes