import logging, sys from datetime import date, timedelta, datetime, timezone from typing import Optional, Tuple import psycopg2 from psycopg2.extras import DictCursor from app.core.config import ( RUIJIE_ACCESS_TOKEN, LOG_LEVEL, WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET, DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD ) from app.extractor.ruijie import refresh_token, extract_all_users from app.extractor.wifeed import get_access_token as wf_token, extract_all_access, WiFeedIPBlockedError from app.transform.merge_mac import transform_ruijie, transform_wifeed from app.load.load_database import load logging.basicConfig( level=getattr(logging, LOG_LEVEL, "INFO"), format="%(asctime)s %(levelname)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)] ) log = logging.getLogger(__name__) # ---------- Watermark helpers ---------- def get_watermark(conn, source: str) -> Tuple[Optional[str], Optional[datetime]]: """ Retorna (last_value, last_run_at) da tabela watermarks. Se não existir, retorna (None, None). """ with conn.cursor(cursor_factory=DictCursor) as cur: cur.execute("SELECT last_value, last_run_at FROM watermarks WHERE source = %s", (source,)) row = cur.fetchone() if row: return row['last_value'], row['last_run_at'] return None, None def set_watermark(conn, source: str, value: str): """ Insere ou atualiza watermark para a fonte. """ with conn.cursor() as cur: cur.execute(""" INSERT INTO watermarks (source, last_value, last_run_at) VALUES (%s, %s, NOW()) ON CONFLICT (source) DO UPDATE SET last_value = EXCLUDED.last_value, last_run_at = EXCLUDED.last_run_at; """, (source, value)) log.info(f"Watermark [{source}] = {value}") def should_use_watermark(last_run_at: Optional[datetime], max_age_hours: int = 1) -> bool: """ Porto seguro: se última execução foi há menos de max_age_hours, USAMOS o watermark (evita re-processar mesmos dados em retry/recorrência). Se há mais de max_age_hours, IGNORA watermark (força nova extração). """ if not last_run_at: return False age = datetime.now(timezone.utc) - last_run_at return age < timedelta(hours=max_age_hours) # ---------- Main ---------- def run(): log.info("=== ETL START ===") conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD) conn.autocommit = False try: # === Ruijie === log.info("--- Ruijie ---") token_r = refresh_token(RUIJIE_ACCESS_TOKEN) if RUIJIE_ACCESS_TOKEN else __import__('app.extractor.ruijie', fromlist=['get_access_token']).get_access_token() # Watermark Ruijie (epoch ms como string) wm_val_str, last_run = get_watermark(conn, 'ruijie') use_wm = should_use_watermark(last_run, max_age_hours=1) wm_ms = int(wm_val_str) if use_wm and wm_val_str else None raw_r, new_wm_ms = extract_all_users(token_r, watermark_ms=wm_ms) log.info(f"Ruijie: raw_r tem {len(raw_r)} registros extraídos") sessions = [transform_ruijie(r) for r in raw_r] # Sem filtro if log.info(f"Ruijie: {len(sessions)} sessions transformadas") log.info(f"Ruijie: {len(sessions)} sessions (wm={'used' if use_wm else 'ignored'})") # === WiFeed === log.info("--- WiFeed ---") token_w = None try: token_w = wf_token(WIFEED_CLIENT_ID, WIFEED_CLIENT_SECRET) except WiFeedIPBlockedError as e: log.error(f"⛔ WiFeed desabilitado: {e}") log.error("⏳ Aguarde desbloqueio do IP e tente novamente amanhã") token_w = None users = [] new_wm_date = date.today().strftime("%Y-%m-%d") if token_w: wm_date_str, last_run_w = get_watermark(conn, 'wifeed') use_wm_w = should_use_watermark(last_run_w, max_age_hours=1) wm_date = date.fromisoformat(wm_date_str) if use_wm_w and wm_date_str else None raw_w, new_wm_date = extract_all_access(token_w, watermark_date=wm_date) log.info(f"WiFeed: raw_w tem {len(raw_w)} registros extraídos") users = [transform_wifeed(r) for r in raw_w] # Sem filtro if log.info(f"WiFeed: {len(users)} users transformados") log.info(f"WiFeed: {len(users)} users (wm={'used' if use_wm_w else 'ignored'})") else: log.info("WiFeed: pulando (IP bloqueado ou não autenticado)") # === Load === load(conn, users, sessions) log.info("Load OK") # === Update watermarks (apenas se avançou) === # Ruijie: new_wm_ms pode ser None se não teve registros novos if new_wm_ms is not None: set_watermark(conn, 'ruijie', str(new_wm_ms)) else: log.info("Ruijie: sem registros novos, watermark mantido") # WiFeed: atualiza apenas se conseguiu autenticar if token_w: set_watermark(conn, 'wifeed', new_wm_date) else: log.info("WiFeed: watermark não atualizado (autenticação falhou)") conn.commit() log.info("=== ETL DONE ===") except Exception as e: conn.rollback() log.error(f"ETL failed: {e}", exc_info=True) raise finally: conn.close() if __name__ == "__main__": run()