email de contacto: [email protected]
Bievenido! Este es un proyecto individual para confeccionar en 72hs un proyecto desde cero de ETL con carga incremental. Los datos corresponden a una lista de precios del año 2020, si bien no se especifica, es probable que sean del programa Precios Cuidados.
- ETL automatizado de archivos locales de múltiples formatos
- Limpieza de datos y depuración automatizada
- Gestión del flujo de trabajo vía Apache Airflow / DAGs
- Carga de datos a MySQL (servidor local)
- Incorporación de Multi-Cloud Object Storage compatible con AWS S3 (uso de sensors y hooks)
- Visualización básica en HTML de querys via Flask
Video explicativo de su funcionalidad: LINK
- MinIO RootFull, ver documentación oficial segun OS --> LINK
- Instalación:
mkdir -p ~/minio/data
docker run \
-p 9000:9000 \
-p 9090:9090 \
--name minio \
-v ~/minio/data:/data \
-e "MINIO_ROOT_USER=ROOTNAME" \
-e "MINIO_ROOT_PASSWORD=CHANGEME123" \
quay.io/minio/minio:RELEASE.2022-10-24T18-35-07Z server /data --console-address ":9090"
Luego conectar desde Airflow entrando en Config --> Connections --> Generic.
- En conn_ID colocar: "minio_conn" sin comillas
- En el campo Extra colocar:
{
"aws_access_key_id": "ROOTNAME",
"aws_secret_access_key": "CHANGEME123",
"host": "http://host.docker.internal:9000"
}
- Airflow: pasos secuenciales, ver documentación oficial --> LINK
0) mkdir ./dags ./logs ./plugins
1) docker-compose up airflow-init
2) docker-compose up
3) docker-compose down
4) docker-compose build (instala las librerias de Python en el container de Airflow)
5) docker-compose up
- FileImporter: carga individual de archivos base (sucursales y productos) - FolderImporterPrecios: detección automática de archivos en directorio, soporte de múltiples formatos (XLSX, TXT, CSV, Parquet, JSON) y limpieza posterior. Devuelve un archivo único con valores estandarizados. - CleanSucursal, CleanProducto, CleanPrecios: Normalización de los datos. - DownloadAndRenameFile: Gestiona la descarga de archivos desde el Bucket de Minio. - Dag_Initial_Loading: Gestiona la carga/cleaning y upload a MySQL incremental de archivos locales en Airflow - Dag_S3Bucket_Loading: Gestiona el S3Sensor para vigilar nuevos archivos que se carguen a Minio Bucket y luego activa el S3Hook para realizar descarga, renombre, limpieza y upload a MySQL de los archivos de forma incremental en Airflow. Al hacer un trigger de prueba tiene una ventana de 30 segundos para hacer la carga (se puede modificar).
Gestiona la carga incremental de los archivos iniciales, a posterior se hace una limpieza y se almacena temporalmente en CSV (Airflow no permite returns entre tasks de más de 48kb), y posteriormente se realiza la carga en la base de datos de MySQL, la cuál corre de forma local en la computadora.
Este DAG consta de dos partes: Para su correcta funcionalidad se realizó la conexión pertinente entre Airflow y Minio Storage Service (el cuál corre a traves de un contenedor de Docker). Minio permite la utilización de la API de AWS para la gestión de archivos vía Airflow, lo cuál permitiría en una instancia posterior facilitar en deploy en la nube. El primer paso es utilizar un S3.Sensor para vigilar cada X intervalo de tiempo si se cargan nuevos archivos en el Bucket llamado 'Data'. Una vez detectado un archivo nuevo se procede a su identificación, descarga local en Airflow, transformación y carga en SQL.
Tanto este DAG como el previo finalizan con un Query de prueba a MySQL con el QUERY solicitado en la consigna: Precio promedio de la sucursal 9-1-688
No hubo tiempo para hacer mejoras estéticas, pero si se corre el archivo 'main.py' se puede acceder de forma local en el navegador a la API que devuelve el resultado de la QUERY de la consigna.
-Script con programación orientada a objetos -Nuevas funcionalidades con DAG -El entorno de trabajo es Unix/ARM64, hay que verificar su puesta en marcha en sistemas x86 u otros ya que las imágenes de Docker cambian. -Deploy en AWS -Utilizar Flask y HTML para generar un web-deploy con API REST que permita comandar distintas QUERYs básicas
El archivo Excel tenía una de sus columnas con formato múltiple (datetime y float) lo cuál hacia imposible la importación correcta con Pandas con el engine OpenPyXL. Según lo investigado en StackOverFlow es una limitación del engine que genera un comportamiento azaroso y fuerza en algunas ocasiones los datos a datetime a pesar de especificar lo opuesto, y por lo pronto no hay otro engine que soporte Pandas y XLSX. Corregido esto no hubo inconvenientes en la carga y transformación.