import requests import logging import json from typing import Dict, Any, Optional, Tuple, List from datetime import date # Suprimir warning SSL para requests com verify=False import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) from app.core.config import WIFEED_BASE_URL logger = logging.getLogger(__name__) BASE_URL = WIFEED_BASE_URL.rstrip('/') if WIFEED_BASE_URL else "" class WiFeedIPBlockedError(Exception): """Exceção quando WiFeed bloqueia o IP (CrowdSec).""" pass def get_access_token(client_id: str, client_secret: str) -> str: """ Autentica na API WiFeed e retorna o token de acesso. Token válido por 24 horas. Baseado em: https://api.wifeed.com.br/auth/api/login """ if not client_id or not client_secret: raise ValueError("WiFeed: clientId e clientSecret são obrigatórios") if not BASE_URL: raise ValueError("WiFeed: WIFEED_BASE_URL não está configurado") url = f"{BASE_URL}/auth/api/login" payload = {"clientId": client_id, "clientSecret": client_secret} headers = {"Content-Type": "application/json"} logger.info(f"WiFeed: autenticando em {url}") try: resp = requests.post(url, json=payload, headers=headers, timeout=15, verify=False) # Detecta CrowdSec Ban (bloqueio de IP pela WAF) if resp.status_code == 403 and "CrowdSec" in resp.text: logger.error(f"WiFeed: ⛔ IP BLOQUEADO por CrowdSec (WAF da WiFeed)") logger.error(f"WiFeed: Seu IP foi marcado como suspeito/bloqueado.") logger.error(f"WiFeed: ➜ Entre em contato com suporte WiFeed (support@wifeed.com.br) para desbloquear") raise WiFeedIPBlockedError("IP bloqueado pela WiFeed (CrowdSec). Contacte suporte: support@wifeed.com.br") resp.raise_for_status() except WiFeedIPBlockedError: raise except requests.exceptions.RequestException as e: logger.error(f"WiFeed: Erro na requisição de login: {e}") if hasattr(e, 'response') and e.response is not None: logger.error(f"WiFeed: Status code: {e.response.status_code}") logger.error(f"WiFeed: Response body (primeiros 300 chars): {e.response.text[:300]}") raise try: data = resp.json() except json.JSONDecodeError: logger.error(f"WiFeed: Resposta não-JSON. Status: {resp.status_code}") logger.error(f"WiFeed: Body (primeiros 300 chars): {resp.text[:300]}") raise ValueError(f"WiFeed: API retornou resposta inválida (não-JSON)") # Tenta encontrar o token em diferentes campos possíveis # WiFeed retorna em data.response.token (estrutura aninhada) token = ( data.get("response", {}).get("token") or # Estrutura atual WiFeed data.get("token") or # Fallback direto data.get("access_token") or # Fallback alternativo data.get("Authorization") # Fallback último ) if not token: logger.error(f"WiFeed: Token não encontrado. Resposta completa: {json.dumps(data, indent=2, default=str)}") raise ValueError(f"WiFeed: Token não retornado pela API. Chaves: {list(data.keys())}") # Remove prefixo "Bearer" se existir if isinstance(token, str) and token.startswith("Bearer "): token = token[7:] logger.info(f"WiFeed: autenticação bem-sucedida, token vigente por 24h") return token def extract_all_access( access_token: str, watermark_date: Optional[date] = None ) -> Tuple[List[Dict], str]: """ Extrai registros de acessos (clientes conectados) do WiFeed. Endpoint: GET /core/openapi/v2/report/access?date=YYYY-MM-DD Retorna lista completa de acessos para a data especificada. Args: access_token: Token Bearer de autenticação watermark_date: Data a extrair (padrão: hoje) Returns: (lista_de_registros, watermark_date_str) """ target_date = watermark_date or date.today() url = f"{BASE_URL}/core/openapi/v2/report/access" logger.info(f"WiFeed: extração de acessos para {target_date.strftime('%Y-%m-%d')}") try: params = {"date": target_date.strftime("%Y-%m-%d")} headers = {"Authorization": f"Bearer {access_token}"} resp = requests.get(url, headers=headers, params=params, timeout=30, verify=False) resp.raise_for_status() records = resp.json() # Valida que é uma lista if not isinstance(records, list): logger.warning(f"WiFeed: resposta não é lista, tentando extrair 'data' field") if isinstance(records, dict) and "data" in records: records = records["data"] else: logger.error(f"WiFeed: estrutura inesperada: {type(records)}") records = [] logger.info(f"WiFeed: {len(records)} acessos extraídos para {target_date.strftime('%Y-%m-%d')}") return records, target_date.strftime("%Y-%m-%d") except Exception as e: logger.error(f"WiFeed: erro durante extração: {e}", exc_info=True) raise