Some checks failed
Deploy WiFi-ETL Prod / deploy (push) Failing after 0s
- Adicionado suporte para extração de dados do Ruijie e WiFeed, incluindo autenticação e tratamento de erros. - Adicionado suporte para watermarking em ambas as fontes para extração incremental. - Criado script de transformação para mesclagem de MAC addresses. - Implementado Backfill para WiFeed, permitindo extração histórica com controle de taxa. - Adicionado script de depuração para testes de transformação do WiFeed. - Desenvolvido scripts de implantação e configurações do Docker para setup de produção. - Criado script de inicialização do schema do PostgreSQL em infra/init.sql. - Adicionado teste automatizado para lógica de transformação e carregamento em test_transform_load.py. - Atualizado documentation para implantação e setup de produção.
145 lines
5.4 KiB
Python
145 lines
5.4 KiB
Python
import logging, sys
|
|
from datetime import date, timedelta, datetime, timezone
|
|
from typing import Optional, Tuple
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import DictCursor
|
|
|
|
from app.core.config import (
|
|
RUIJIE_ACCESS_TOKEN, LOG_LEVEL,
|
|
WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET,
|
|
DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD
|
|
)
|
|
from app.extractor.ruijie import refresh_token, extract_all_users
|
|
from app.extractor.wifeed import get_access_token as wf_token, extract_all_access, WiFeedIPBlockedError
|
|
from app.transform.merge_mac import transform_ruijie, transform_wifeed
|
|
from app.load.load_database import load
|
|
|
|
logging.basicConfig(
|
|
level=getattr(logging, LOG_LEVEL, "INFO"),
|
|
format="%(asctime)s %(levelname)s: %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)]
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------- Watermark helpers ----------
|
|
def get_watermark(conn, source: str) -> Tuple[Optional[str], Optional[datetime]]:
|
|
"""
|
|
Retorna (last_value, last_run_at) da tabela watermarks.
|
|
Se não existir, retorna (None, None).
|
|
"""
|
|
with conn.cursor(cursor_factory=DictCursor) as cur:
|
|
cur.execute("SELECT last_value, last_run_at FROM watermarks WHERE source = %s", (source,))
|
|
row = cur.fetchone()
|
|
if row:
|
|
return row['last_value'], row['last_run_at']
|
|
return None, None
|
|
|
|
|
|
def set_watermark(conn, source: str, value: str):
|
|
"""
|
|
Insere ou atualiza watermark para a fonte.
|
|
"""
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO watermarks (source, last_value, last_run_at)
|
|
VALUES (%s, %s, NOW())
|
|
ON CONFLICT (source) DO UPDATE SET
|
|
last_value = EXCLUDED.last_value,
|
|
last_run_at = EXCLUDED.last_run_at;
|
|
""", (source, value))
|
|
log.info(f"Watermark [{source}] = {value}")
|
|
|
|
|
|
def should_use_watermark(last_run_at: Optional[datetime], max_age_hours: int = 1) -> bool:
|
|
"""
|
|
Porto seguro: se última execução foi há menos de max_age_hours,
|
|
USAMOS o watermark (evita re-processar mesmos dados em retry/recorrência).
|
|
Se há mais de max_age_hours, IGNORA watermark (força nova extração).
|
|
"""
|
|
if not last_run_at:
|
|
return False
|
|
age = datetime.now(timezone.utc) - last_run_at
|
|
return age < timedelta(hours=max_age_hours)
|
|
|
|
|
|
# ---------- Main ----------
|
|
def run():
|
|
log.info("=== ETL START ===")
|
|
|
|
conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD)
|
|
conn.autocommit = False
|
|
|
|
try:
|
|
# === Ruijie ===
|
|
log.info("--- Ruijie ---")
|
|
token_r = refresh_token(RUIJIE_ACCESS_TOKEN) if RUIJIE_ACCESS_TOKEN else __import__('app.extractor.ruijie', fromlist=['get_access_token']).get_access_token()
|
|
|
|
# Watermark Ruijie (epoch ms como string)
|
|
wm_val_str, last_run = get_watermark(conn, 'ruijie')
|
|
use_wm = should_use_watermark(last_run, max_age_hours=1)
|
|
wm_ms = int(wm_val_str) if use_wm and wm_val_str else None
|
|
|
|
raw_r, new_wm_ms = extract_all_users(token_r, watermark_ms=wm_ms)
|
|
log.info(f"Ruijie: raw_r tem {len(raw_r)} registros extraídos")
|
|
sessions = [transform_ruijie(r) for r in raw_r] # Sem filtro if
|
|
log.info(f"Ruijie: {len(sessions)} sessions transformadas")
|
|
log.info(f"Ruijie: {len(sessions)} sessions (wm={'used' if use_wm else 'ignored'})")
|
|
|
|
# === WiFeed ===
|
|
log.info("--- WiFeed ---")
|
|
token_w = None
|
|
try:
|
|
token_w = wf_token(WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET)
|
|
except WiFeedIPBlockedError as e:
|
|
log.error(f"⛔ WiFeed desabilitado: {e}")
|
|
log.error("⏳ Aguarde desbloqueio do IP e tente novamente amanhã")
|
|
token_w = None
|
|
users = []
|
|
new_wm_date = date.today().strftime("%Y-%m-%d")
|
|
|
|
if token_w:
|
|
wm_date_str, last_run_w = get_watermark(conn, 'wifeed')
|
|
use_wm_w = should_use_watermark(last_run_w, max_age_hours=1)
|
|
wm_date = date.fromisoformat(wm_date_str) if use_wm_w and wm_date_str else None
|
|
|
|
raw_w, new_wm_date = extract_all_access(token_w, watermark_date=wm_date)
|
|
log.info(f"WiFeed: raw_w tem {len(raw_w)} registros extraídos")
|
|
users = [transform_wifeed(r) for r in raw_w] # Sem filtro if
|
|
log.info(f"WiFeed: {len(users)} users transformados")
|
|
log.info(f"WiFeed: {len(users)} users (wm={'used' if use_wm_w else 'ignored'})")
|
|
else:
|
|
log.info("WiFeed: pulando (IP bloqueado ou não autenticado)")
|
|
|
|
# === Load ===
|
|
load(conn, users, sessions)
|
|
log.info("Load OK")
|
|
|
|
# === Update watermarks (apenas se avançou) ===
|
|
# Ruijie: new_wm_ms pode ser None se não teve registros novos
|
|
if new_wm_ms is not None:
|
|
set_watermark(conn, 'ruijie', str(new_wm_ms))
|
|
else:
|
|
log.info("Ruijie: sem registros novos, watermark mantido")
|
|
|
|
# WiFeed: atualiza apenas se conseguiu autenticar
|
|
if token_w:
|
|
set_watermark(conn, 'wifeed', new_wm_date)
|
|
else:
|
|
log.info("WiFeed: watermark não atualizado (autenticação falhou)")
|
|
|
|
conn.commit()
|
|
log.info("=== ETL DONE ===")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
log.error(f"ETL failed: {e}", exc_info=True)
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|