wifi-etl/app/main.py

145 lines
5.4 KiB
Python
Raw Normal View History

import logging, sys
from datetime import date, timedelta, datetime, timezone
from typing import Optional, Tuple
import psycopg2
from psycopg2.extras import DictCursor
from app.core.config import (
RUIJIE_ACCESS_TOKEN, LOG_LEVEL,
WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET,
DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD
)
from app.extractor.ruijie import refresh_token, extract_all_users
from app.extractor.wifeed import get_access_token as wf_token, extract_all_access, WiFeedIPBlockedError
from app.transform.merge_mac import transform_ruijie, transform_wifeed
from app.load.load_database import load
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, "INFO"),
format="%(asctime)s %(levelname)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
log = logging.getLogger(__name__)
# ---------- Watermark helpers ----------
def get_watermark(conn, source: str) -> Tuple[Optional[str], Optional[datetime]]:
"""
Retorna (last_value, last_run_at) da tabela watermarks.
Se não existir, retorna (None, None).
"""
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute("SELECT last_value, last_run_at FROM watermarks WHERE source = %s", (source,))
row = cur.fetchone()
if row:
return row['last_value'], row['last_run_at']
return None, None
def set_watermark(conn, source: str, value: str):
"""
Insere ou atualiza watermark para a fonte.
"""
with conn.cursor() as cur:
cur.execute("""
INSERT INTO watermarks (source, last_value, last_run_at)
VALUES (%s, %s, NOW())
ON CONFLICT (source) DO UPDATE SET
last_value = EXCLUDED.last_value,
last_run_at = EXCLUDED.last_run_at;
""", (source, value))
log.info(f"Watermark [{source}] = {value}")
def should_use_watermark(last_run_at: Optional[datetime], max_age_hours: int = 1) -> bool:
"""
Porto seguro: se última execução foi menos de max_age_hours,
USAMOS o watermark (evita re-processar mesmos dados em retry/recorrência).
Se mais de max_age_hours, IGNORA watermark (força nova extração).
"""
if not last_run_at:
return False
age = datetime.now(timezone.utc) - last_run_at
return age < timedelta(hours=max_age_hours)
# ---------- Main ----------
def run():
log.info("=== ETL START ===")
conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD)
conn.autocommit = False
try:
# === Ruijie ===
log.info("--- Ruijie ---")
token_r = refresh_token(RUIJIE_ACCESS_TOKEN) if RUIJIE_ACCESS_TOKEN else __import__('app.extractor.ruijie', fromlist=['get_access_token']).get_access_token()
# Watermark Ruijie (epoch ms como string)
wm_val_str, last_run = get_watermark(conn, 'ruijie')
use_wm = should_use_watermark(last_run, max_age_hours=1)
wm_ms = int(wm_val_str) if use_wm and wm_val_str else None
raw_r, new_wm_ms = extract_all_users(token_r, watermark_ms=wm_ms)
log.info(f"Ruijie: raw_r tem {len(raw_r)} registros extraídos")
sessions = [transform_ruijie(r) for r in raw_r] # Sem filtro if
log.info(f"Ruijie: {len(sessions)} sessions transformadas")
log.info(f"Ruijie: {len(sessions)} sessions (wm={'used' if use_wm else 'ignored'})")
# === WiFeed ===
log.info("--- WiFeed ---")
token_w = None
try:
token_w = wf_token(WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET)
except WiFeedIPBlockedError as e:
log.error(f"⛔ WiFeed desabilitado: {e}")
log.error("⏳ Aguarde desbloqueio do IP e tente novamente amanhã")
token_w = None
users = []
new_wm_date = date.today().strftime("%Y-%m-%d")
if token_w:
wm_date_str, last_run_w = get_watermark(conn, 'wifeed')
use_wm_w = should_use_watermark(last_run_w, max_age_hours=1)
wm_date = date.fromisoformat(wm_date_str) if use_wm_w and wm_date_str else None
raw_w, new_wm_date = extract_all_access(token_w, watermark_date=wm_date)
log.info(f"WiFeed: raw_w tem {len(raw_w)} registros extraídos")
users = [transform_wifeed(r) for r in raw_w] # Sem filtro if
log.info(f"WiFeed: {len(users)} users transformados")
log.info(f"WiFeed: {len(users)} users (wm={'used' if use_wm_w else 'ignored'})")
else:
log.info("WiFeed: pulando (IP bloqueado ou não autenticado)")
# === Load ===
load(conn, users, sessions)
log.info("Load OK")
# === Update watermarks (apenas se avançou) ===
# Ruijie: new_wm_ms pode ser None se não teve registros novos
if new_wm_ms is not None:
set_watermark(conn, 'ruijie', str(new_wm_ms))
else:
log.info("Ruijie: sem registros novos, watermark mantido")
# WiFeed: atualiza apenas se conseguiu autenticar
if token_w:
set_watermark(conn, 'wifeed', new_wm_date)
else:
log.info("WiFeed: watermark não atualizado (autenticação falhou)")
conn.commit()
log.info("=== ETL DONE ===")
except Exception as e:
conn.rollback()
log.error(f"ETL failed: {e}", exc_info=True)
raise
finally:
conn.close()
if __name__ == "__main__":
run()