wifi-etl/app/main.py
Rafael Lopes 331a021d9a
Some checks failed
Deploy WiFi-ETL Prod / deploy (push) Failing after 0s
FEAT: Implementado ETL completo para Ruijie e Wifeed
- Adicionado suporte para extração de dados do Ruijie e WiFeed, incluindo autenticação e tratamento de erros.
- Adicionado suporte para watermarking em ambas as fontes para extração incremental.
- Criado script de transformação para mesclagem de MAC addresses.
- Implementado Backfill para WiFeed, permitindo extração histórica com controle de taxa.
- Adicionado script de depuração para testes de transformação do WiFeed.
- Desenvolvido scripts de implantação e configurações do Docker para setup de produção.
- Criado script de inicialização do schema do PostgreSQL em infra/init.sql.
- Adicionado teste automatizado para lógica de transformação e carregamento em test_transform_load.py.
- Atualizado documentation para implantação e setup de produção.
2026-04-22 16:55:44 -03:00

145 lines
5.4 KiB
Python

import logging, sys
from datetime import date, timedelta, datetime, timezone
from typing import Optional, Tuple
import psycopg2
from psycopg2.extras import DictCursor
from app.core.config import (
RUIJIE_ACCESS_TOKEN, LOG_LEVEL,
WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET,
DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD
)
from app.extractor.ruijie import refresh_token, extract_all_users
from app.extractor.wifeed import get_access_token as wf_token, extract_all_access, WiFeedIPBlockedError
from app.transform.merge_mac import transform_ruijie, transform_wifeed
from app.load.load_database import load
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, "INFO"),
format="%(asctime)s %(levelname)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
log = logging.getLogger(__name__)
# ---------- Watermark helpers ----------
def get_watermark(conn, source: str) -> Tuple[Optional[str], Optional[datetime]]:
"""
Retorna (last_value, last_run_at) da tabela watermarks.
Se não existir, retorna (None, None).
"""
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute("SELECT last_value, last_run_at FROM watermarks WHERE source = %s", (source,))
row = cur.fetchone()
if row:
return row['last_value'], row['last_run_at']
return None, None
def set_watermark(conn, source: str, value: str):
"""
Insere ou atualiza watermark para a fonte.
"""
with conn.cursor() as cur:
cur.execute("""
INSERT INTO watermarks (source, last_value, last_run_at)
VALUES (%s, %s, NOW())
ON CONFLICT (source) DO UPDATE SET
last_value = EXCLUDED.last_value,
last_run_at = EXCLUDED.last_run_at;
""", (source, value))
log.info(f"Watermark [{source}] = {value}")
def should_use_watermark(last_run_at: Optional[datetime], max_age_hours: int = 1) -> bool:
"""
Porto seguro: se última execução foi há menos de max_age_hours,
USAMOS o watermark (evita re-processar mesmos dados em retry/recorrência).
Se há mais de max_age_hours, IGNORA watermark (força nova extração).
"""
if not last_run_at:
return False
age = datetime.now(timezone.utc) - last_run_at
return age < timedelta(hours=max_age_hours)
# ---------- Main ----------
def run():
log.info("=== ETL START ===")
conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD)
conn.autocommit = False
try:
# === Ruijie ===
log.info("--- Ruijie ---")
token_r = refresh_token(RUIJIE_ACCESS_TOKEN) if RUIJIE_ACCESS_TOKEN else __import__('app.extractor.ruijie', fromlist=['get_access_token']).get_access_token()
# Watermark Ruijie (epoch ms como string)
wm_val_str, last_run = get_watermark(conn, 'ruijie')
use_wm = should_use_watermark(last_run, max_age_hours=1)
wm_ms = int(wm_val_str) if use_wm and wm_val_str else None
raw_r, new_wm_ms = extract_all_users(token_r, watermark_ms=wm_ms)
log.info(f"Ruijie: raw_r tem {len(raw_r)} registros extraídos")
sessions = [transform_ruijie(r) for r in raw_r] # Sem filtro if
log.info(f"Ruijie: {len(sessions)} sessions transformadas")
log.info(f"Ruijie: {len(sessions)} sessions (wm={'used' if use_wm else 'ignored'})")
# === WiFeed ===
log.info("--- WiFeed ---")
token_w = None
try:
token_w = wf_token(WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET)
except WiFeedIPBlockedError as e:
log.error(f"⛔ WiFeed desabilitado: {e}")
log.error("⏳ Aguarde desbloqueio do IP e tente novamente amanhã")
token_w = None
users = []
new_wm_date = date.today().strftime("%Y-%m-%d")
if token_w:
wm_date_str, last_run_w = get_watermark(conn, 'wifeed')
use_wm_w = should_use_watermark(last_run_w, max_age_hours=1)
wm_date = date.fromisoformat(wm_date_str) if use_wm_w and wm_date_str else None
raw_w, new_wm_date = extract_all_access(token_w, watermark_date=wm_date)
log.info(f"WiFeed: raw_w tem {len(raw_w)} registros extraídos")
users = [transform_wifeed(r) for r in raw_w] # Sem filtro if
log.info(f"WiFeed: {len(users)} users transformados")
log.info(f"WiFeed: {len(users)} users (wm={'used' if use_wm_w else 'ignored'})")
else:
log.info("WiFeed: pulando (IP bloqueado ou não autenticado)")
# === Load ===
load(conn, users, sessions)
log.info("Load OK")
# === Update watermarks (apenas se avançou) ===
# Ruijie: new_wm_ms pode ser None se não teve registros novos
if new_wm_ms is not None:
set_watermark(conn, 'ruijie', str(new_wm_ms))
else:
log.info("Ruijie: sem registros novos, watermark mantido")
# WiFeed: atualiza apenas se conseguiu autenticar
if token_w:
set_watermark(conn, 'wifeed', new_wm_date)
else:
log.info("WiFeed: watermark não atualizado (autenticação falhou)")
conn.commit()
log.info("=== ETL DONE ===")
except Exception as e:
conn.rollback()
log.error(f"ETL failed: {e}", exc_info=True)
raise
finally:
conn.close()
if __name__ == "__main__":
run()