ETL · Python + SQL · Demo sintética

ETL con observabilidad y trazabilidad

Ingesta de archivos operativos/contables (CSV) → validaciones de esquema y control de totales → deduplicación → carga a SQL → reporte automático y observabilidad. Reduce el tiempo de procesamiento.

Tiempo de proceso (10.5K)

3.11s obtenido

Esperado: < 5s

Validación exitosa

88.78% obtenido

Esperado: ≥ 85%

Idempotencia (UPSERT)

0 duplicados insertados

Esperado: 0

¿Por qué este proyecto?

Problema real que resuelve

En operaciones reales (sector público y privado) se reciben miles de transacciones diarias desde fuentes distintas (operativas y contables). Conciliar manualmente implica duplicados, errores de formato y descuadres, además de poca trazabilidad. Este proyecto nace para automatizar esa conciliación con reglas claras, controles de calidad y auditoría end‑to‑end.

Calidad de datos

Validaciones de esquema, fechas y montos; checksums y control de totales para detectar descuadres.

Idempotencia

Deduplicación por hash/clave de negocio y UPSERT transaccional a SQL.

Trazabilidad

Logs estructurados, métricas y reportes ejecutivos (Excel/PDF) para auditorías.

Motivación personal

Impacto esperado (KPIs)

  • -40% del tiempo de conciliación vs. proceso manual.
  • 100% de duplicados detectados en pruebas sintéticas.
  • <0.5% de errores por formato gracias a validaciones tempranas.

Resumen ejecutivo

Pipeline ETL de Conciliación Operativa es un sistema completo en Python que automatiza la ingesta, validación, deduplicación y carga de datos operativos/contables a SQL Server/SQLite, con observabilidad completa y reportes automáticos.

El proyecto demuestra arquitectura ETL de nivel producción con manejo robusto de errores, validaciones de negocio con Pandera (100+ reglas), deduplicación hash-based (SHA-256 de campos clave: numero_operacion, monto, fecha, cuenta), operaciones UPSERT idempotentes y trazabilidad completa mediante logging estructurado y métricas automatizadas.

Stack tecnológico: Python 3.10, Pandas, Pandera, SQLAlchemy, SQL Server, SQLite, Structlog, OpenPyXL.

Arquitectura

[CSV Input] 
    ↓
[Extracción] - Lee CSV con pandas
    ↓
[Validación] - Pandera schemas, control de totales
    ↓
[Deduplicación] - Hash-based, idempotente
    ↓
[Carga SQL] - UPSERT a SQL Server/SQLite
    ↓
[Métricas/Logs] - JSON estructurado
    ↓
[Reporte Excel] - 3 hojas con gráficos

Estructura del proyecto

prj-peru-dataops-etl/
├── data/
│   ├── input/              # CSVs de entrada
│   ├── processed/          # Intermedios
│   ├── output/
│   │   ├── metrics/        # Métricas JSON
│   │   └── reports/        # Reportes Excel
│   └── synthetic/
│       └── generator.py    # Generador datos sintéticos
├── src/
│   ├── validation/
│   │   ├── schemas.py      # Schemas Pandera
│   │   └── validators.py   # Motor validación
│   ├── deduplication/
│   │   └── dedup_engine.py # Motor deduplicación
│   ├── loading/
│   │   └── sql_loader.py   # Carga SQL
│   ├── reporting/
│   │   └── excel_report.py # Reportes Excel
│   └── utils/
│       ├── logging_config.py
│       └── metrics.py
├── scripts/
│   ├── run_etl_pipeline.py # Pipeline completo
│   ├── generate_report.py  # Generador reportes
│   └── reset_database.py
├── tests/
│   ├── test_validation.py
│   ├── test_deduplication.py
│   ├── test_sql_loading.py
│   └── test_logging_metrics.py
└── requirements.txt

Módulos principales

1. Generador Datos Sintéticos

Genera 10K+ transacciones bancarias con Faker. Inyecta duplicados (5%) y errores (2%) intencionalmente para testing.

2. Validación con Pandera

Schemas con 100+ reglas de negocio. Control de totales, checksums. Reportes detallados de errores. Tasa: ~88%.

3. Deduplicación

Hash-based y key-based. Idempotente. Priorización con ordenamiento temporal. SHA-256 de campos clave.

4. Carga SQL

SQL Server (Windows Auth) y SQLite. UPSERT con MERGE. Tablas temporales. Truncamiento automático de texto.

5. Logging y Métricas

Logs JSON estructurados con Structlog. Métricas por ejecución en JSON. Agregación histórica de estadísticas.

6. Reportes Excel

3 hojas: Resumen Ejecutivo, Datos, Métricas. Formato profesional. Gráficos (pie chart distribución).

Comandos de ejecución

# Generar datos sintéticos
python data/synthetic/generator.py

# Pipeline completo (SQL Server)
python scripts/run_etl_pipeline.py \
  --input data/input/operaciones_demo_2025.csv \
  --db-type sqlserver

# Pipeline completo (SQLite)
python scripts/run_etl_pipeline.py \
  --input data/input/operaciones_demo_2025.csv \
  --db-type sqlite

# Generar reporte Excel
python scripts/generate_report.py \
  --pipeline-id etl_20251005_121829 \
  --db-type sqlserver

# Ejecutar tests
python tests/test_validation.py
python tests/test_deduplication.py
python tests/test_sql_loading.py
python tests/test_logging_metrics.py

Resultados de ejecución real

Última ejecución exitosa:

Validación exitosa

88.78%
  • Válidos: 9,322
  • Inválidos: 1,178
  • Total: 10,500

Cálculo: 9,322 / 10,500 ≈ 88.78%

Distribución de registros

9.3k
  • Cargados: 9,322
  • No cargados: 1,178

Tiempo total: 3.11s · Duplicados removidos: 0

Datos cargados (preview)

id fecha_operacion numero_operacion tipo_operacion monto moneda cuenta_origen cuenta_destino banco_origen banco_destino estado canal content_hash fecha_carga is_duplicate
1 2025-09-11 06:29:16.043 OP-00000001 DEPOSITO 17312.85 PEN 516-1402086-1-97 RIPLEY COMPLETADA SUCURSAL 5b11e64f620ad6ac 2025-10-05 12:18:32.133 NULL
2 2025-09-18 21:08:16.043 OP-00000002 DEPOSITO 3951.54 PEN 564-2687580-7-87 993-4018282-4-18 RIPLEY RIPLEY COMPLETADA MOBILE 8c12a0984b4f1041 2025-10-05 12:18:32.133 NULL

Tabla: dbo.operaciones (preview de 2 filas ilustrativas).

Configuración requerida

Variables de entorno (.env)

DB_TYPE=sqlserver
DB_HOST=PC-MARCO\SQLEXPRESS
DB_PORT=1433
DB_NAME=ETL_Conciliacion
DB_TRUSTED_CONNECTION=true
DB_DRIVER=ODBC Driver 17 for SQL Server
DB_SCHEMA=dbo

SQL Server Setup

  • Instancia: PC-MARCO\SQLEXPRESS
  • Base de datos: ETL_Conciliacion
  • Autenticación: Windows (Trusted Connection)
  • Tabla principal: dbo.operaciones

Archivos de salida

Logs

Logs JSON estructurados por día en logs/etl_pipeline_YYYYMMDD.log

Métricas

Una por ejecución en data/output/metrics/metrics_etl_*.json. Contiene tiempos, contadores, errores.

Reportes Excel

3 hojas con resumen ejecutivo, datos y gráficos en data/output/reports/*.xlsx

Procesados

Datos limpios por ejecución en data/processed/deduped_*.csv

Ejecución real del pipeline

Salida directa de consola (última exitosa).

Ver logs
======================================================================
ETL PIPELINE EXECUTION
======================================================================
Input file: data/input/operaciones_demo_2025.csv
Database: sqlserver
======================================================================
2025-10-05T17:18:29.351669Z [info     ] pipeline_started               [__main__] input_file=data/input/operaciones_demo_2025.csv pipeline_id=etl_20251005_121829
2025-10-05T17:18:29.352669Z [info     ] extraction_started             [__main__] file=data/input/operaciones_demo_2025.csv
2025-10-05T17:18:29.383177Z [info     ] extraction_completed           [__main__] rows=10500
2025-10-05T17:18:29.383177Z [info     ] validation_started             [__main__] input_rows=10500
2025-10-05T17:18:31.986742Z [info     ] validation_completed           [__main__] invalid_rows=1178 success_rate=0.8878095238095238 valid_rows=9322
2025-10-05T17:18:31.986742Z [info     ] deduplication_started          [__main__] input_rows=9322
2025-10-05T17:18:31.989706Z [info     ] deduplication_completed        [__main__] duplicates_found=0 duplicates_removed=0 final_rows=9322
2025-10-05T17:18:32.028083Z [info     ] loading_started                [__main__] database=sqlserver rows=9322
Connected to SQL Server
Database: ETL_Conciliacion
Table 'dbo.operaciones' created/verified
2025-10-05T17:18:32.458621Z [info     ] loading_completed              [__main__] rows_failed=0 rows_inserted=9322 rows_updated=0
Database connection closed
2025-10-05T17:18:32.458621Z [info     ] pipeline_completed             [__main__] pipeline_id=etl_20251005_121829 processing_time=3.106952 status=success
2025-10-05T17:18:32.473669Z [info     ] metrics_saved                  [__main__] filepath=data\output\metrics\metrics_etl_20251005_121829.json

======================================================================
PIPELINE COMPLETED SUCCESSFULLY
======================================================================
pipeline_id: etl_20251005_121829
status: success
total_input: 10500
total_loaded: 9322
success_rate: 88.78%
duplicates_removed: 0
validation_errors: 100
processing_time: 3.11s
======================================================================

Artefactos del pipeline

Reporte Excel - Resumen Ejecutivo
Reporte Excel: Resumen Ejecutivo
Reporte Excel - Métricas
Reporte Excel: Métricas

Highlights técnicos

Tests

Todos los tests ejecutados exitosamente:

Métricas:

  • 3.11 segundos para 10,500 registros
  • 88.78% tasa de validación exitosa
  • UPSERT idempotente (0 duplicados)
  • Reportes Excel automáticos con 3 hojas
  • 100% observabilidad (logs + métricas)

Próximos pasos sugeridos

  • Orquestación con Prefect (scheduling automático)
  • Dashboard web con Streamlit (visualización en tiempo real)
  • Alertas por email/Slack en caso de fallos
  • CI/CD con GitHub Actions
  • Dockerización del pipeline

¿Preguntas sobre este proyecto?

Envíame un mensaje.