Si trabajas con datos, sabes que la forma en la que los almacenamos y procesamos evoluciona a un...
Deep dive técnico: construyendo nuestro serverless Data Warehouse
En nuestro post anterior, conversamos sobre los dolores que nos llevaron a migrar nuestro Data Warehouse interno (ix_dw) desde una base de datos tradicional en Cloud SQL a una arquitectura serverless utilizando Cloud Storage, BigQuery y archivos en formato parquet.
Hoy, vamos a sumergirnos en la parte técnica: cómo logramos implementar la nueva lógica de extracción, transformación y carga (ETL por sus siglas en inglés), y las herramientas que utilizamos para hacerlo posible y con buen desempeño.
Nuestro nuevo stack: R, polars y delta lake
Para esta nueva etapa del ix_dw, decidimos mantener R como nuestro lenguaje principal para orquestar la lógica de datos, pero incorporando herramientas de alto rendimiento:
- polars a través de r-polars: Una librería de manipulación de datos ultrarrápida escrita en Rust, ideal para procesar y consultar archivos Parquet de manera eficiente y r-polars para acceder los R bindings de polars.
- deltalakeR: Un paquete de R que nos permite interactuar de forma nativa con el formato Delta Lake, el cual le da superpoderes a nuestros archivos Parquet (como transacciones ACID y manejo de versiones) sobre Google Cloud Storage.
- nanoarrow: Para definir de manera estricta y segura los esquemas de nuestros datos.
Implementando la nueva lógica de ETLs
El proceso general de nuestros nuevos ETLs se divide en tres pasos principales:
1. Extracción y limpieza
Nos conectamos a nuestras fuentes de origen (como sheets, otros sistemas, APIs, entre otros) para extraer los datos crudos. Un cambio clave en la nueva arquitectura es que ahora almacenamos esta data tal cual viene en una capa cruda dentro de Cloud Storage, antes de aplicar transformaciones pesadas.
2. Definición de esquemas
Uno de los mayores retos al trabajar con data lakes es mantener la consistencia de los tipos de datos. Para evitar que un cambio inesperado en una fuente de origen rompa el análisis, utilizamos nanoarrow para declarar explícitamente el esquema de nuestras tablas.
Por ejemplo, definimos qué columnas son de texto, cuáles son numéricas y cuáles corresponden a fechas. Si un dato nuevo no calza con la estructura predefinida, el proceso falla de manera controlada antes de introducir inconsistencias en el data lake.
3. Uso de merge (upserts)
Al momento de guardar y actualizar grandes volúmenes de datos históricos en nuestro Data Lake, aprendimos que sobreescribir tablas enteras o reemplazar particiones completas consume recursos innecesarios.
Para resolver esto de forma elegante y eficiente, incorporamos operaciones Merge (Upserts) a través de deltalakeR.
En lugar de reemplazar todo un bloque de datos, una operación Merge nos permite comparar los registros entrantes con los existentes en el Delta Lake, actualizando únicamente las filas que sufrieron cambios e insertando aquellas que son completamente nuevas.
Aquí un ejemplo simplificado de cómo aplicamos esta lógica a través del paquete deltalakeR:
# Definimos los datos de origen con las actualizaciones y nuevos registros
source_data <- data.frame(id = c(2, 4), horas = c(25, 40))
# Upsert: actualiza el registro existente e inserta el nuevo
resultado <- delta_merge("ruta/a/nuestro/datalake", source_data, "target.id = source.id") |>
when_matched_update(c(horas = "source.horas")) |>
when_not_matched_insert(c(id = "source.id", horas = "source.horas")) |>
merge_execute()
Esta función nos da un control granular sobre nuestros datos. Permite actualizaciones y eliminaciones condicionadas, así logramos reducir drásticamente el cómputo necesario y el movimiento de datos.
Consultas veloces
Finalmente, una vez que los datos están seguros y actualizados en formato Delta Lake, la lectura para análisis, reportes o auditorías de los ETLs la hacemos mediante polars:
delta_datos <- delta_table("ruta/a/nuestro/datalake")
# Declaramos nuestras operaciones de manera perezosa (lazy)
consulta_optimizada <- pl$scan_parquet(get_files(delta_datos)) |>
filter(anio == 2024) |>
group_by("cliente") |>
summarize(total_horas = sum("horas"))
# Ejecutamos todo de un solo golpe al final
datos <- consulta_optimizada |> collect()
Al utilizar la función de escaneo, polars aplica técnicas de lazy evaluation (evaluación perezosa) y predicate pushdown. El verdadero poder aquí radica en que podemos encadenar múltiples operaciones de manipulación , como filtros, agrupaciones, uniones y cálculos, antes de llamar a collect() y traerse los datos a memoria.
Al hacerlo, polars no ejecuta paso por paso, sino que analiza todos los pasos a tomar juntos, optimiza el plan de consulta y luego lee directamente de los archivos parquet únicamente lo estrictamente necesario. Esto significa que nunca cargamos la tabla completa en memoria, alcanzando tiempos de lectura y procesamiento difíciles de igualar con otros enfoques tradicionales.
Tablas en BigQuery
Una de las realidades de adoptar una arquitectura serverless puramente basada en archivos es que algunas herramientas de analítica aún no pueden conectarse directamente a este tipo de arquitectura. Por ejemplo, algunas herramientas de visualización e inteligencia de negocios como Looker no pueden leer directamente nuestros archivos parquet almacenados en Cloud Storage y es una de las herramientas que utilizamos a lo interno en ixpantia.
Para solucionar este puente sin perder los beneficios de nuestra nueva arquitectura ni duplicar los datos, aprovechamos las tablas externas de BigQuery. Básicamente, configuramos BigQuery para que lea de forma directa nuestros archivos parquet en el bucket de almacenamiento. De esta manera, Looker se conecta a BigQuery utilizando la interfaz de SQL tradicional, pero el almacenamiento real sigue viviendo en Cloud Storage, no hay costo de BigQuery asociado al almacenamiento.
Conclusión
Migrar una arquitectura serverless de data warehouse requirió refactoreo de nuestros dataductos y procesos de datos, pero el esfuerzo rindió frutos rápidamente. Al usar formatos flexibles (parquet / delta lake) y herramientas modernas de alto rendimiento en R (polars/deltalakeR), y acoplarlos inteligentemente con BigQuery para visualización en casos necesarios, no solo resolvimos nuestros problemas de escalabilidad y altos costos, sino que dotamos a nuestras operaciones de datos de la robustez que requerimos.
En ixpantia, creemos en construir sistemas future-proofed hoy, y con esta renovación, estamos listos para sacar el máximo valor a nuestros datos y acompañar a nuestros clientes en su propio camino de modernización.